blob: ac299835d63566c43eb08cb0c5dba76634929f4d [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
61import net.floodlightcontroller.core.annotations.LogMessageDoc;
62import net.floodlightcontroller.core.annotations.LogMessageDocs;
63import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
64import net.floodlightcontroller.core.util.ListenerDispatcher;
65import net.floodlightcontroller.core.web.CoreWebRoutable;
66import net.floodlightcontroller.counter.ICounterStoreService;
67import net.floodlightcontroller.packet.Ethernet;
68import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
69import net.floodlightcontroller.restserver.IRestApiService;
70import net.floodlightcontroller.storage.IResultSet;
71import net.floodlightcontroller.storage.IStorageSourceListener;
72import net.floodlightcontroller.storage.IStorageSourceService;
73import net.floodlightcontroller.storage.OperatorPredicate;
74import net.floodlightcontroller.storage.StorageException;
75import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070076import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
HIGUCHI Yuta60a10142013-06-14 15:50:10 -070077import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080078import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080079import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080080import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080081
82import org.jboss.netty.bootstrap.ServerBootstrap;
83import org.jboss.netty.buffer.ChannelBuffer;
84import org.jboss.netty.buffer.ChannelBuffers;
85import org.jboss.netty.channel.Channel;
86import org.jboss.netty.channel.ChannelHandlerContext;
87import org.jboss.netty.channel.ChannelPipelineFactory;
88import org.jboss.netty.channel.ChannelStateEvent;
89import org.jboss.netty.channel.ChannelUpstreamHandler;
90import org.jboss.netty.channel.Channels;
91import org.jboss.netty.channel.ExceptionEvent;
92import org.jboss.netty.channel.MessageEvent;
93import org.jboss.netty.channel.group.ChannelGroup;
94import org.jboss.netty.channel.group.DefaultChannelGroup;
95import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
96import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
97import org.jboss.netty.handler.timeout.IdleStateEvent;
98import org.jboss.netty.handler.timeout.ReadTimeoutException;
99import org.openflow.protocol.OFEchoReply;
100import org.openflow.protocol.OFError;
101import org.openflow.protocol.OFError.OFBadActionCode;
102import org.openflow.protocol.OFError.OFBadRequestCode;
103import org.openflow.protocol.OFError.OFErrorType;
104import org.openflow.protocol.OFError.OFFlowModFailedCode;
105import org.openflow.protocol.OFError.OFHelloFailedCode;
106import org.openflow.protocol.OFError.OFPortModFailedCode;
107import org.openflow.protocol.OFError.OFQueueOpFailedCode;
108import org.openflow.protocol.OFFeaturesReply;
109import org.openflow.protocol.OFGetConfigReply;
110import org.openflow.protocol.OFMessage;
111import org.openflow.protocol.OFPacketIn;
112import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800113import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800115import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800116import org.openflow.protocol.OFPortStatus.OFPortReason;
117import org.openflow.protocol.OFSetConfig;
118import org.openflow.protocol.OFStatisticsRequest;
119import org.openflow.protocol.OFSwitchConfig;
120import org.openflow.protocol.OFType;
121import org.openflow.protocol.OFVendor;
122import org.openflow.protocol.factory.BasicFactory;
123import org.openflow.protocol.factory.MessageParseException;
124import org.openflow.protocol.statistics.OFDescriptionStatistics;
125import org.openflow.protocol.statistics.OFStatistics;
126import org.openflow.protocol.statistics.OFStatisticsType;
127import org.openflow.protocol.vendor.OFBasicVendorDataType;
128import org.openflow.protocol.vendor.OFBasicVendorId;
129import org.openflow.protocol.vendor.OFVendorId;
130import org.openflow.util.HexString;
131import org.openflow.util.U16;
132import org.openflow.util.U32;
133import org.openflow.vendor.nicira.OFNiciraVendorData;
134import org.openflow.vendor.nicira.OFRoleReplyVendorData;
135import org.openflow.vendor.nicira.OFRoleRequestVendorData;
136import org.openflow.vendor.nicira.OFRoleVendorData;
137import org.slf4j.Logger;
138import org.slf4j.LoggerFactory;
139
140
141/**
142 * The main controller class. Handles all setup and network listeners
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700143 *
144 * Extensions made by ONOS are:
145 * - Detailed Port event: PORTCHANGED -> {PORTCHANGED, PORTADDED, PORTREMOVED}
146 * Available as net.onrc.onos.ofcontroller.core.IOFSwitchPortListener
147 * - Distributed ownership control of switch through RegistryService(IControllerRegistryService)
Pavlin Radoslavovddd01ba2013-07-03 15:40:44 -0700148 * - Register ONOS services. (IFlowService, IControllerRegistryService)
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700149 * - Additional DEBUG logs
150 * - Try using hostname as controller ID, when ID was not explicitly given.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800151 */
152public class Controller implements IFloodlightProviderService,
153 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700154
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800155 protected static Logger log = LoggerFactory.getLogger(Controller.class);
156
157 private static final String ERROR_DATABASE =
158 "The controller could not communicate with the system database.";
159
160 protected BasicFactory factory;
161 protected ConcurrentMap<OFType,
162 ListenerDispatcher<OFType,IOFMessageListener>>
163 messageListeners;
164 // The activeSwitches map contains only those switches that are actively
165 // being controlled by us -- it doesn't contain switches that are
166 // in the slave role
167 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
168 // connectedSwitches contains all connected switches, including ones where
169 // we're a slave controller. We need to keep track of them so that we can
170 // send role request messages to switches when our role changes to master
171 // We add a switch to this set after it successfully completes the
172 // handshake. Access to this Set needs to be synchronized with roleChanger
173 protected HashSet<OFSwitchImpl> connectedSwitches;
174
175 // The controllerNodeIPsCache maps Controller IDs to their IP address.
176 // It's only used by handleControllerNodeIPsChanged
177 protected HashMap<String, String> controllerNodeIPsCache;
178
179 protected Set<IOFSwitchListener> switchListeners;
180 protected Set<IHAListener> haListeners;
181 protected Map<String, List<IInfoProvider>> providerMap;
182 protected BlockingQueue<IUpdate> updates;
183
184 // Module dependencies
185 protected IRestApiService restApi;
186 protected ICounterStoreService counterStore = null;
187 protected IStorageSourceService storageSource;
188 protected IPktInProcessingTimeService pktinProcTime;
189 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800190 protected IFlowService flowService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800191 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800192
193 // Configuration options
194 protected int openFlowPort = 6633;
195 protected int workerThreads = 0;
196 // The id for this controller node. Should be unique for each controller
197 // node in a controller cluster.
198 protected String controllerId = "localhost";
199
200 // The current role of the controller.
201 // If the controller isn't configured to support roles, then this is null.
202 protected Role role;
203 // A helper that handles sending and timeout handling for role requests
204 protected RoleChanger roleChanger;
205
206 // Start time of the controller
207 protected long systemStartTime;
208
209 // Flag to always flush flow table on switch reconnect (HA or otherwise)
210 protected boolean alwaysClearFlowsOnSwAdd = false;
211
212 // Storage table names
213 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
214 protected static final String CONTROLLER_ID = "id";
215
216 protected static final String SWITCH_TABLE_NAME = "controller_switch";
217 protected static final String SWITCH_DATAPATH_ID = "dpid";
218 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
219 protected static final String SWITCH_IP = "ip";
220 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
221 protected static final String SWITCH_ACTIVE = "active";
222 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
223 protected static final String SWITCH_CAPABILITIES = "capabilities";
224 protected static final String SWITCH_BUFFERS = "buffers";
225 protected static final String SWITCH_TABLES = "tables";
226 protected static final String SWITCH_ACTIONS = "actions";
227
228 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
229 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
230
231 protected static final String PORT_TABLE_NAME = "controller_port";
232 protected static final String PORT_ID = "id";
233 protected static final String PORT_SWITCH = "switch_id";
234 protected static final String PORT_NUMBER = "number";
235 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
236 protected static final String PORT_NAME = "name";
237 protected static final String PORT_CONFIG = "config";
238 protected static final String PORT_STATE = "state";
239 protected static final String PORT_CURRENT_FEATURES = "current_features";
240 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
241 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
242 protected static final String PORT_PEER_FEATURES = "peer_features";
243
244 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
245 protected static final String CONTROLLER_INTERFACE_ID = "id";
246 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
247 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
248 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
249 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
250
251
252
253 // Perf. related configuration
254 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
255 protected static final int BATCH_MAX_SIZE = 100;
256 protected static final boolean ALWAYS_DECODE_ETH = true;
257
258 /**
259 * Updates handled by the main loop
260 */
261 protected interface IUpdate {
262 /**
263 * Calls the appropriate listeners
264 */
265 public void dispatch();
266 }
267 public enum SwitchUpdateType {
268 ADDED,
269 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700270 PORTCHANGED,
271 PORTADDED,
272 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800273 }
274 /**
275 * Update message indicating a switch was added or removed
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700276 * ONOS: This message extended to indicate Port add or removed event.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800277 */
278 protected class SwitchUpdate implements IUpdate {
279 public IOFSwitch sw;
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700280 public OFPhysicalPort port; // Added by ONOS
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800281 public SwitchUpdateType switchUpdateType;
282 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
283 this.sw = sw;
284 this.switchUpdateType = switchUpdateType;
285 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700286 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
287 this.sw = sw;
288 this.port = port;
289 this.switchUpdateType = switchUpdateType;
290 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800291 public void dispatch() {
292 if (log.isTraceEnabled()) {
293 log.trace("Dispatching switch update {} {}",
294 sw, switchUpdateType);
295 }
296 if (switchListeners != null) {
297 for (IOFSwitchListener listener : switchListeners) {
298 switch(switchUpdateType) {
299 case ADDED:
300 listener.addedSwitch(sw);
301 break;
302 case REMOVED:
303 listener.removedSwitch(sw);
304 break;
305 case PORTCHANGED:
306 listener.switchPortChanged(sw.getId());
307 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700308 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700309 if (listener instanceof IOFSwitchPortListener) {
310 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
311 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700312 break;
313 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700314 if (listener instanceof IOFSwitchPortListener) {
315 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
316 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700317 break;
318 default:
319 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800320 }
321 }
322 }
323 }
324 }
325
326 /**
327 * Update message indicating controller's role has changed
328 */
329 protected class HARoleUpdate implements IUpdate {
330 public Role oldRole;
331 public Role newRole;
332 public HARoleUpdate(Role newRole, Role oldRole) {
333 this.oldRole = oldRole;
334 this.newRole = newRole;
335 }
336 public void dispatch() {
337 // Make sure that old and new roles are different.
338 if (oldRole == newRole) {
339 if (log.isTraceEnabled()) {
340 log.trace("HA role update ignored as the old and " +
341 "new roles are the same. newRole = {}" +
342 "oldRole = {}", newRole, oldRole);
343 }
344 return;
345 }
346 if (log.isTraceEnabled()) {
347 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
348 newRole, oldRole);
349 }
350 if (haListeners != null) {
351 for (IHAListener listener : haListeners) {
352 listener.roleChanged(oldRole, newRole);
353 }
354 }
355 }
356 }
357
358 /**
359 * Update message indicating
360 * IPs of controllers in controller cluster have changed.
361 */
362 protected class HAControllerNodeIPUpdate implements IUpdate {
363 public Map<String,String> curControllerNodeIPs;
364 public Map<String,String> addedControllerNodeIPs;
365 public Map<String,String> removedControllerNodeIPs;
366 public HAControllerNodeIPUpdate(
367 HashMap<String,String> curControllerNodeIPs,
368 HashMap<String,String> addedControllerNodeIPs,
369 HashMap<String,String> removedControllerNodeIPs) {
370 this.curControllerNodeIPs = curControllerNodeIPs;
371 this.addedControllerNodeIPs = addedControllerNodeIPs;
372 this.removedControllerNodeIPs = removedControllerNodeIPs;
373 }
374 public void dispatch() {
375 if (log.isTraceEnabled()) {
376 log.trace("Dispatching HA Controller Node IP update "
377 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
378 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
379 removedControllerNodeIPs }
380 );
381 }
382 if (haListeners != null) {
383 for (IHAListener listener: haListeners) {
384 listener.controllerNodeIPsChanged(curControllerNodeIPs,
385 addedControllerNodeIPs, removedControllerNodeIPs);
386 }
387 }
388 }
389 }
390
391 // ***************
392 // Getters/Setters
393 // ***************
394
395 public void setStorageSourceService(IStorageSourceService storageSource) {
396 this.storageSource = storageSource;
397 }
398
399 public void setCounterStore(ICounterStoreService counterStore) {
400 this.counterStore = counterStore;
401 }
402
403 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
404 this.pktinProcTime = pits;
405 }
406
407 public void setRestApiService(IRestApiService restApi) {
408 this.restApi = restApi;
409 }
410
411 public void setThreadPoolService(IThreadPoolService tp) {
412 this.threadPool = tp;
413 }
414
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800415 public void setFlowService(IFlowService serviceImpl) {
416 this.flowService = serviceImpl;
417 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800418
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800419 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800420 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800421 }
422
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800423 @Override
424 public Role getRole() {
425 synchronized(roleChanger) {
426 return role;
427 }
428 }
429
430 @Override
431 public void setRole(Role role) {
432 if (role == null) throw new NullPointerException("Role can not be null.");
433 if (role == Role.MASTER && this.role == Role.SLAVE) {
434 // Reset db state to Inactive for all switches.
435 updateAllInactiveSwitchInfo();
436 }
437
438 // Need to synchronize to ensure a reliable ordering on role request
439 // messages send and to ensure the list of connected switches is stable
440 // RoleChanger will handle the actual sending of the message and
441 // timeout handling
442 // @see RoleChanger
443 synchronized(roleChanger) {
444 if (role.equals(this.role)) {
445 log.debug("Ignoring role change: role is already {}", role);
446 return;
447 }
448
449 Role oldRole = this.role;
450 this.role = role;
451
452 log.debug("Submitting role change request to role {}", role);
453 roleChanger.submitRequest(connectedSwitches, role);
454
455 // Enqueue an update for our listeners.
456 try {
457 this.updates.put(new HARoleUpdate(role, oldRole));
458 } catch (InterruptedException e) {
459 log.error("Failure adding update to queue", e);
460 }
461 }
462 }
463
464
465
466 // **********************
467 // ChannelUpstreamHandler
468 // **********************
469
470 /**
471 * Return a new channel handler for processing a switch connections
472 * @param state The channel state object for the connection
473 * @return the new channel handler
474 */
475 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
476 return new OFChannelHandler(state);
477 }
478
Jonathan Hartcc957a02013-02-26 10:39:04 -0800479 protected class RoleChangeCallback implements ControlChangeCallback {
480 @Override
481 public void controlChanged(long dpid, boolean hasControl) {
482 log.info("Role change callback for switch {}, hasControl {}",
483 HexString.toHexString(dpid), hasControl);
484
485 synchronized(roleChanger){
486 OFSwitchImpl sw = null;
487 for (OFSwitchImpl connectedSw : connectedSwitches){
488 if (connectedSw.getId() == dpid){
489 sw = connectedSw;
490 break;
491 }
492 }
493 if (sw == null){
494 log.warn("Switch {} not found in connected switches",
495 HexString.toHexString(dpid));
496 return;
497 }
498
499 Role role = null;
500
Pankaj Berde01939e92013-03-08 14:38:27 -0800501 /*
502 * issue #229
503 * Cannot rely on sw.getRole() as it can be behind due to pending
504 * role changes in the queue. Just submit it and late the RoleChanger
505 * handle duplicates.
506 */
507
508 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800509 role = Role.MASTER;
510 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800511 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800512 role = Role.SLAVE;
513 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800514
515 log.debug("Sending role request {} msg to {}", role, sw);
516 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
517 swList.add(sw);
518 roleChanger.submitRequest(swList, role);
519
Jonathan Hartcc957a02013-02-26 10:39:04 -0800520 }
521
522 }
523 }
524
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800525 /**
526 * Channel handler deals with the switch connection and dispatches
527 * switch messages to the appropriate locations.
528 * @author readams
529 */
530 protected class OFChannelHandler
531 extends IdleStateAwareChannelUpstreamHandler {
532 protected OFSwitchImpl sw;
533 protected OFChannelState state;
534
535 public OFChannelHandler(OFChannelState state) {
536 this.state = state;
537 }
538
539 @Override
540 @LogMessageDoc(message="New switch connection from {ip address}",
541 explanation="A new switch has connected from the " +
542 "specified IP address")
543 public void channelConnected(ChannelHandlerContext ctx,
544 ChannelStateEvent e) throws Exception {
545 log.info("New switch connection from {}",
546 e.getChannel().getRemoteAddress());
547
548 sw = new OFSwitchImpl();
549 sw.setChannel(e.getChannel());
550 sw.setFloodlightProvider(Controller.this);
551 sw.setThreadPoolService(threadPool);
552
553 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
554 msglist.add(factory.getMessage(OFType.HELLO));
555 e.getChannel().write(msglist);
556
557 }
558
559 @Override
560 @LogMessageDoc(message="Disconnected switch {switch information}",
561 explanation="The specified switch has disconnected.")
562 public void channelDisconnected(ChannelHandlerContext ctx,
563 ChannelStateEvent e) throws Exception {
564 if (sw != null && state.hsState == HandshakeState.READY) {
565 if (activeSwitches.containsKey(sw.getId())) {
566 // It's safe to call removeSwitch even though the map might
567 // not contain this particular switch but another with the
568 // same DPID
569 removeSwitch(sw);
570 }
571 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700572 if (controlRequested) {
573 registryService.releaseControl(sw.getId());
574 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800575 connectedSwitches.remove(sw);
576 }
577 sw.setConnected(false);
578 }
579 log.info("Disconnected switch {}", sw);
580 }
581
582 @Override
583 @LogMessageDocs({
584 @LogMessageDoc(level="ERROR",
585 message="Disconnecting switch {switch} due to read timeout",
586 explanation="The connected switch has failed to send any " +
587 "messages or respond to echo requests",
588 recommendation=LogMessageDoc.CHECK_SWITCH),
589 @LogMessageDoc(level="ERROR",
590 message="Disconnecting switch {switch}: failed to " +
591 "complete handshake",
592 explanation="The switch did not respond correctly " +
593 "to handshake messages",
594 recommendation=LogMessageDoc.CHECK_SWITCH),
595 @LogMessageDoc(level="ERROR",
596 message="Disconnecting switch {switch} due to IO Error: {}",
597 explanation="There was an error communicating with the switch",
598 recommendation=LogMessageDoc.CHECK_SWITCH),
599 @LogMessageDoc(level="ERROR",
600 message="Disconnecting switch {switch} due to switch " +
601 "state error: {error}",
602 explanation="The switch sent an unexpected message",
603 recommendation=LogMessageDoc.CHECK_SWITCH),
604 @LogMessageDoc(level="ERROR",
605 message="Disconnecting switch {switch} due to " +
606 "message parse failure",
607 explanation="Could not parse a message from the switch",
608 recommendation=LogMessageDoc.CHECK_SWITCH),
609 @LogMessageDoc(level="ERROR",
610 message="Terminating controller due to storage exception",
611 explanation=ERROR_DATABASE,
612 recommendation=LogMessageDoc.CHECK_CONTROLLER),
613 @LogMessageDoc(level="ERROR",
614 message="Could not process message: queue full",
615 explanation="OpenFlow messages are arriving faster than " +
616 " the controller can process them.",
617 recommendation=LogMessageDoc.CHECK_CONTROLLER),
618 @LogMessageDoc(level="ERROR",
619 message="Error while processing message " +
620 "from switch {switch} {cause}",
621 explanation="An error occurred processing the switch message",
622 recommendation=LogMessageDoc.GENERIC_ACTION)
623 })
624 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
625 throws Exception {
626 if (e.getCause() instanceof ReadTimeoutException) {
627 // switch timeout
628 log.error("Disconnecting switch {} due to read timeout", sw);
629 ctx.getChannel().close();
630 } else if (e.getCause() instanceof HandshakeTimeoutException) {
631 log.error("Disconnecting switch {}: failed to complete handshake",
632 sw);
633 ctx.getChannel().close();
634 } else if (e.getCause() instanceof ClosedChannelException) {
635 //log.warn("Channel for sw {} already closed", sw);
636 } else if (e.getCause() instanceof IOException) {
637 log.error("Disconnecting switch {} due to IO Error: {}",
638 sw, e.getCause().getMessage());
639 ctx.getChannel().close();
640 } else if (e.getCause() instanceof SwitchStateException) {
641 log.error("Disconnecting switch {} due to switch state error: {}",
642 sw, e.getCause().getMessage());
643 ctx.getChannel().close();
644 } else if (e.getCause() instanceof MessageParseException) {
645 log.error("Disconnecting switch " + sw +
646 " due to message parse failure",
647 e.getCause());
648 ctx.getChannel().close();
649 } else if (e.getCause() instanceof StorageException) {
650 log.error("Terminating controller due to storage exception",
651 e.getCause());
652 terminate();
653 } else if (e.getCause() instanceof RejectedExecutionException) {
654 log.warn("Could not process message: queue full");
655 } else {
656 log.error("Error while processing message from switch " + sw,
657 e.getCause());
658 ctx.getChannel().close();
659 }
660 }
661
662 @Override
663 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
664 throws Exception {
665 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
666 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
667 e.getChannel().write(msglist);
668 }
669
670 @Override
671 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
672 throws Exception {
673 if (e.getMessage() instanceof List) {
674 @SuppressWarnings("unchecked")
675 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
676
677 for (OFMessage ofm : msglist) {
678 try {
679 processOFMessage(ofm);
680 }
681 catch (Exception ex) {
682 // We are the last handler in the stream, so run the
683 // exception through the channel again by passing in
684 // ctx.getChannel().
685 Channels.fireExceptionCaught(ctx.getChannel(), ex);
686 }
687 }
688
689 // Flush all flow-mods/packet-out generated from this "train"
690 OFSwitchImpl.flush_all();
691 }
692 }
693
694 /**
695 * Process the request for the switch description
696 */
697 @LogMessageDoc(level="ERROR",
698 message="Exception in reading description " +
699 " during handshake {exception}",
700 explanation="Could not process the switch description string",
701 recommendation=LogMessageDoc.CHECK_SWITCH)
702 void processSwitchDescReply() {
703 try {
704 // Read description, if it has been updated
705 @SuppressWarnings("unchecked")
706 Future<List<OFStatistics>> desc_future =
707 (Future<List<OFStatistics>>)sw.
708 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
709 List<OFStatistics> values =
710 desc_future.get(0, TimeUnit.MILLISECONDS);
711 if (values != null) {
712 OFDescriptionStatistics description =
713 new OFDescriptionStatistics();
714 ChannelBuffer data =
715 ChannelBuffers.buffer(description.getLength());
716 for (OFStatistics f : values) {
717 f.writeTo(data);
718 description.readFrom(data);
719 break; // SHOULD be a list of length 1
720 }
721 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
722 description);
723 sw.setSwitchProperties(description);
724 data = null;
725
726 // At this time, also set other switch properties from storage
727 boolean is_core_switch = false;
728 IResultSet resultSet = null;
729 try {
730 String swid = sw.getStringId();
731 resultSet =
732 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
733 for (Iterator<IResultSet> it =
734 resultSet.iterator(); it.hasNext();) {
735 // In case of multiple rows, use the status
736 // in last row?
737 Map<String, Object> row = it.next().getRow();
738 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
739 if (log.isDebugEnabled()) {
740 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
741 "config for switch={}, is-core={}",
742 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
743 }
744 String ics =
745 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
746 is_core_switch = ics.equals("true");
747 }
748 }
749 }
750 finally {
751 if (resultSet != null)
752 resultSet.close();
753 }
754 if (is_core_switch) {
755 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
756 new Boolean(true));
757 }
758 }
759 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
760 state.hasDescription = true;
761 checkSwitchReady();
762 }
763 catch (InterruptedException ex) {
764 // Ignore
765 }
766 catch (TimeoutException ex) {
767 // Ignore
768 } catch (Exception ex) {
769 log.error("Exception in reading description " +
770 " during handshake", ex);
771 }
772 }
773
774 /**
775 * Send initial switch setup information that we need before adding
776 * the switch
777 * @throws IOException
778 */
779 void sendHelloConfiguration() throws IOException {
780 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700781 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800782 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
783 }
784
785 /**
786 * Send the configuration requests we can only do after we have
787 * the features reply
788 * @throws IOException
789 */
790 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700791 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800792 // Ensure we receive the full packet via PacketIn
793 OFSetConfig config = (OFSetConfig) factory
794 .getMessage(OFType.SET_CONFIG);
795 config.setMissSendLength((short) 0xffff)
796 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
797 sw.write(config, null);
798 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
799 null);
800
801 // Get Description to set switch-specific flags
802 OFStatisticsRequest req = new OFStatisticsRequest();
803 req.setStatisticType(OFStatisticsType.DESC);
804 req.setLengthU(req.getLengthU());
805 Future<List<OFStatistics>> dfuture =
806 sw.getStatistics(req);
807 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
808 dfuture);
809
810 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700811
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700812 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800813 protected void checkSwitchReady() {
814 if (state.hsState == HandshakeState.FEATURES_REPLY &&
815 state.hasDescription && state.hasGetConfigReply) {
816
817 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700818 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800819
820 synchronized(roleChanger) {
821 // We need to keep track of all of the switches that are connected
822 // to the controller, in any role, so that we can later send the
823 // role request messages when the controller role changes.
824 // We need to be synchronized while doing this: we must not
825 // send a another role request to the connectedSwitches until
826 // we were able to add this new switch to connectedSwitches
827 // *and* send the current role to the new switch.
828 connectedSwitches.add(sw);
829
830 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800831 //Put the switch in SLAVE mode until we know we have control
832 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
833 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
834 swList.add(sw);
835 roleChanger.submitRequest(swList, Role.SLAVE);
836
Jonathan Hartcc957a02013-02-26 10:39:04 -0800837 //Request control of the switch from the global registry
838 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700839 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800840 registryService.requestControl(sw.getId(),
841 new RoleChangeCallback());
842 } catch (RegistryException e) {
843 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700844 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800845 }
846
Jonathan Hart97801ac2013-02-26 14:29:16 -0800847
Jonathan Hartcc957a02013-02-26 10:39:04 -0800848
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800849 // Send a role request if role support is enabled for the controller
850 // This is a probe that we'll use to determine if the switch
851 // actually supports the role request message. If it does we'll
852 // get back a role reply message. If it doesn't we'll get back an
853 // OFError message.
854 // If role is MASTER we will promote switch to active
855 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800856 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800857 log.debug("This controller's role is {}, " +
858 "sending initial role request msg to {}",
859 role, sw);
860 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
861 swList.add(sw);
862 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800863 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800864 }
865 else {
866 // Role supported not enabled on controller (for now)
867 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800868 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800869 "not sending role request msg to {}",
870 role, sw);
871 // Need to clear FlowMods before we add the switch
872 // and dispatch updates otherwise we have a race condition.
873 sw.clearAllFlowMods();
874 addSwitch(sw);
875 state.firstRoleReplyReceived = true;
876 }
877 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700878 if (!controlRequested) {
879 // yield to allow other thread(s) to release control
880 try {
881 Thread.sleep(10);
882 } catch (InterruptedException e) {
883 // Ignore interruptions
884 }
885 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700886 log.debug("Closing {} because we weren't able to request control " +
887 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700888 sw.channel.close();
889 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800890 }
891 }
892
893 /* Handle a role reply message we received from the switch. Since
894 * netty serializes message dispatch we don't need to synchronize
895 * against other receive operations from the same switch, so no need
896 * to synchronize addSwitch(), removeSwitch() operations from the same
897 * connection.
898 * FIXME: However, when a switch with the same DPID connects we do
899 * need some synchronization. However, handling switches with same
900 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
901 * removedSwitch notification):1
902 *
903 */
904 @LogMessageDoc(level="ERROR",
905 message="Invalid role value in role reply message",
906 explanation="Was unable to set the HA role (master or slave) " +
907 "for the controller.",
908 recommendation=LogMessageDoc.CHECK_CONTROLLER)
909 protected void handleRoleReplyMessage(OFVendor vendorMessage,
910 OFRoleReplyVendorData roleReplyVendorData) {
911 // Map from the role code in the message to our role enum
912 int nxRole = roleReplyVendorData.getRole();
913 Role role = null;
914 switch (nxRole) {
915 case OFRoleVendorData.NX_ROLE_OTHER:
916 role = Role.EQUAL;
917 break;
918 case OFRoleVendorData.NX_ROLE_MASTER:
919 role = Role.MASTER;
920 break;
921 case OFRoleVendorData.NX_ROLE_SLAVE:
922 role = Role.SLAVE;
923 break;
924 default:
925 log.error("Invalid role value in role reply message");
926 sw.getChannel().close();
927 return;
928 }
929
930 log.debug("Handling role reply for role {} from {}. " +
931 "Controller's role is {} ",
932 new Object[] { role, sw, Controller.this.role}
933 );
934
935 sw.deliverRoleReply(vendorMessage.getXid(), role);
936
937 boolean isActive = activeSwitches.containsKey(sw.getId());
938 if (!isActive && sw.isActive()) {
939 // Transition from SLAVE to MASTER.
940
941 if (!state.firstRoleReplyReceived ||
942 getAlwaysClearFlowsOnSwAdd()) {
943 // This is the first role-reply message we receive from
944 // this switch or roles were disabled when the switch
945 // connected:
946 // Delete all pre-existing flows for new connections to
947 // the master
948 //
949 // FIXME: Need to think more about what the test should
950 // be for when we flush the flow-table? For example,
951 // if all the controllers are temporarily in the backup
952 // role (e.g. right after a failure of the master
953 // controller) at the point the switch connects, then
954 // all of the controllers will initially connect as
955 // backup controllers and not flush the flow-table.
956 // Then when one of them is promoted to master following
957 // the master controller election the flow-table
958 // will still not be flushed because that's treated as
959 // a failover event where we don't want to flush the
960 // flow-table. The end result would be that the flow
961 // table for a newly connected switch is never
962 // flushed. Not sure how to handle that case though...
963 sw.clearAllFlowMods();
964 log.debug("First role reply from master switch {}, " +
965 "clear FlowTable to active switch list",
966 HexString.toHexString(sw.getId()));
967 }
968
969 // Some switches don't seem to update us with port
970 // status messages while in slave role.
971 readSwitchPortStateFromStorage(sw);
972
973 // Only add the switch to the active switch list if
974 // we're not in the slave role. Note that if the role
975 // attribute is null, then that means that the switch
976 // doesn't support the role request messages, so in that
977 // case we're effectively in the EQUAL role and the
978 // switch should be included in the active switch list.
979 addSwitch(sw);
980 log.debug("Added master switch {} to active switch list",
981 HexString.toHexString(sw.getId()));
982
983 }
984 else if (isActive && !sw.isActive()) {
985 // Transition from MASTER to SLAVE: remove switch
986 // from active switch list.
987 log.debug("Removed slave switch {} from active switch" +
988 " list", HexString.toHexString(sw.getId()));
989 removeSwitch(sw);
990 }
991
992 // Indicate that we have received a role reply message.
993 state.firstRoleReplyReceived = true;
994 }
995
996 protected boolean handleVendorMessage(OFVendor vendorMessage) {
997 boolean shouldHandleMessage = false;
998 int vendor = vendorMessage.getVendor();
999 switch (vendor) {
1000 case OFNiciraVendorData.NX_VENDOR_ID:
1001 OFNiciraVendorData niciraVendorData =
1002 (OFNiciraVendorData)vendorMessage.getVendorData();
1003 int dataType = niciraVendorData.getDataType();
1004 switch (dataType) {
1005 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
1006 OFRoleReplyVendorData roleReplyVendorData =
1007 (OFRoleReplyVendorData) niciraVendorData;
1008 handleRoleReplyMessage(vendorMessage,
1009 roleReplyVendorData);
1010 break;
1011 default:
1012 log.warn("Unhandled Nicira VENDOR message; " +
1013 "data type = {}", dataType);
1014 break;
1015 }
1016 break;
1017 default:
1018 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1019 break;
1020 }
1021
1022 return shouldHandleMessage;
1023 }
1024
1025 /**
1026 * Dispatch an Openflow message from a switch to the appropriate
1027 * handler.
1028 * @param m The message to process
1029 * @throws IOException
1030 * @throws SwitchStateException
1031 */
1032 @LogMessageDocs({
1033 @LogMessageDoc(level="WARN",
1034 message="Config Reply from {switch} has " +
1035 "miss length set to {length}",
1036 explanation="The controller requires that the switch " +
1037 "use a miss length of 0xffff for correct " +
1038 "function",
1039 recommendation="Use a different switch to ensure " +
1040 "correct function"),
1041 @LogMessageDoc(level="WARN",
1042 message="Received ERROR from sw {switch} that "
1043 +"indicates roles are not supported "
1044 +"but we have received a valid "
1045 +"role reply earlier",
1046 explanation="The switch sent a confusing message to the" +
1047 "controller")
1048 })
1049 protected void processOFMessage(OFMessage m)
1050 throws IOException, SwitchStateException {
1051 boolean shouldHandleMessage = false;
1052
1053 switch (m.getType()) {
1054 case HELLO:
1055 if (log.isTraceEnabled())
1056 log.trace("HELLO from {}", sw);
1057
1058 if (state.hsState.equals(HandshakeState.START)) {
1059 state.hsState = HandshakeState.HELLO;
1060 sendHelloConfiguration();
1061 } else {
1062 throw new SwitchStateException("Unexpected HELLO from "
1063 + sw);
1064 }
1065 break;
1066 case ECHO_REQUEST:
1067 OFEchoReply reply =
1068 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1069 reply.setXid(m.getXid());
1070 sw.write(reply, null);
1071 break;
1072 case ECHO_REPLY:
1073 break;
1074 case FEATURES_REPLY:
1075 if (log.isTraceEnabled())
1076 log.trace("Features Reply from {}", sw);
1077
1078 sw.setFeaturesReply((OFFeaturesReply) m);
1079 if (state.hsState.equals(HandshakeState.HELLO)) {
1080 sendFeatureReplyConfiguration();
1081 state.hsState = HandshakeState.FEATURES_REPLY;
1082 // uncomment to enable "dumb" switches like cbench
1083 // state.hsState = HandshakeState.READY;
1084 // addSwitch(sw);
1085 } else {
1086 // return results to rest api caller
1087 sw.deliverOFFeaturesReply(m);
1088 // update database */
1089 updateActiveSwitchInfo(sw);
1090 }
1091 break;
1092 case GET_CONFIG_REPLY:
1093 if (log.isTraceEnabled())
1094 log.trace("Get config reply from {}", sw);
1095
1096 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1097 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1098 throw new SwitchStateException(em);
1099 }
1100 OFGetConfigReply cr = (OFGetConfigReply) m;
1101 if (cr.getMissSendLength() == (short)0xffff) {
1102 log.trace("Config Reply from {} confirms " +
1103 "miss length set to 0xffff", sw);
1104 } else {
1105 log.warn("Config Reply from {} has " +
1106 "miss length set to {}",
1107 sw, cr.getMissSendLength() & 0xffff);
1108 }
1109 state.hasGetConfigReply = true;
1110 checkSwitchReady();
1111 break;
1112 case VENDOR:
1113 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1114 break;
1115 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001116 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001117 // TODO: we need better error handling. Especially for
1118 // request/reply style message (stats, roles) we should have
1119 // a unified way to lookup the xid in the error message.
1120 // This will probable involve rewriting the way we handle
1121 // request/reply style messages.
1122 OFError error = (OFError) m;
1123 boolean shouldLogError = true;
1124 // TODO: should we check that firstRoleReplyReceived is false,
1125 // i.e., check only whether the first request fails?
1126 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1127 boolean isBadVendorError =
1128 (error.getErrorType() == OFError.OFErrorType.
1129 OFPET_BAD_REQUEST.getValue());
1130 // We expect to receive a bad vendor error when
1131 // we're connected to a switch that doesn't support
1132 // the Nicira vendor extensions (i.e. not OVS or
1133 // derived from OVS). By protocol, it should also be
1134 // BAD_VENDOR, but too many switch implementations
1135 // get it wrong and we can already check the xid()
1136 // so we can ignore the type with confidence that this
1137 // is not a spurious error
1138 shouldLogError = !isBadVendorError;
1139 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001140 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001141 if (state.firstRoleReplyReceived && (role != null)) {
1142 log.warn("Received ERROR from sw {} that "
1143 +"indicates roles are not supported "
1144 +"but we have received a valid "
1145 +"role reply earlier", sw);
1146 }
1147 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001148 Role requestedRole =
HIGUCHI Yutaeae374d2013-06-17 10:39:42 -07001149 sw.deliverRoleRequestNotSupportedEx(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001150 synchronized(roleChanger) {
1151 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001152 //This will now never happen. The Controller's role
1153 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001154 // the switch doesn't understand role request
1155 // messages and the current controller role is
1156 // slave. We need to disconnect the switch.
1157 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001158 log.warn("Closing {} channel because controller's role " +
1159 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001160 sw.getChannel().close();
1161 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001162 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001163 log.debug("Adding switch {} because we got an error" +
1164 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001165 // Controller's role is master: add to
1166 // active
1167 // TODO: check if clearing flow table is
1168 // right choice here.
1169 // Need to clear FlowMods before we add the switch
1170 // and dispatch updates otherwise we have a race condition.
1171 // TODO: switch update is async. Won't we still have a potential
1172 // race condition?
1173 sw.clearAllFlowMods();
1174 addSwitch(sw);
1175 }
1176 }
1177 }
1178 else {
1179 // TODO: Is this the right thing to do if we receive
1180 // some other error besides a bad vendor error?
1181 // Presumably that means the switch did actually
1182 // understand the role request message, but there
1183 // was some other error from processing the message.
1184 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1185 // error code, but it doesn't look like the Nicira
1186 // role request has that. Should check OVS source
1187 // code to see if it's possible for any other errors
1188 // to be returned.
1189 // If we received an error the switch is not
1190 // in the correct role, so we need to disconnect it.
1191 // We could also resend the request but then we need to
1192 // check if there are other pending request in which
1193 // case we shouldn't resend. If we do resend we need
1194 // to make sure that the switch eventually accepts one
1195 // of our requests or disconnect the switch. This feels
1196 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001197 log.debug("Closing {} channel because we recieved an " +
1198 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001199 sw.getChannel().close();
1200 }
1201 }
1202 // Once we support OF 1.2, we'd add code to handle it here.
1203 //if (error.getXid() == state.ofRoleRequestXid) {
1204 //}
1205 if (shouldLogError)
1206 logError(sw, error);
1207 break;
1208 case STATS_REPLY:
1209 if (state.hsState.ordinal() <
1210 HandshakeState.FEATURES_REPLY.ordinal()) {
1211 String em = "Unexpected STATS_REPLY from " + sw;
1212 throw new SwitchStateException(em);
1213 }
1214 sw.deliverStatisticsReply(m);
1215 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1216 processSwitchDescReply();
1217 }
1218 break;
1219 case PORT_STATUS:
1220 // We want to update our port state info even if we're in
1221 // the slave role, but we only want to update storage if
1222 // we're the master (or equal).
1223 boolean updateStorage = state.hsState.
1224 equals(HandshakeState.READY) &&
1225 (sw.getRole() != Role.SLAVE);
1226 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1227 shouldHandleMessage = true;
1228 break;
1229
1230 default:
1231 shouldHandleMessage = true;
1232 break;
1233 }
1234
1235 if (shouldHandleMessage) {
1236 sw.getListenerReadLock().lock();
1237 try {
1238 if (sw.isConnected()) {
1239 if (!state.hsState.equals(HandshakeState.READY)) {
1240 log.debug("Ignoring message type {} received " +
1241 "from switch {} before switch is " +
1242 "fully configured.", m.getType(), sw);
1243 }
1244 // Check if the controller is in the slave role for the
1245 // switch. If it is, then don't dispatch the message to
1246 // the listeners.
1247 // TODO: Should we dispatch messages that we expect to
1248 // receive when we're in the slave role, e.g. port
1249 // status messages? Since we're "hiding" switches from
1250 // the listeners when we're in the slave role, then it
1251 // seems a little weird to dispatch port status messages
1252 // to them. On the other hand there might be special
1253 // modules that care about all of the connected switches
1254 // and would like to receive port status notifications.
1255 else if (sw.getRole() == Role.SLAVE) {
1256 // Don't log message if it's a port status message
1257 // since we expect to receive those from the switch
1258 // and don't want to emit spurious messages.
1259 if (m.getType() != OFType.PORT_STATUS) {
1260 log.debug("Ignoring message type {} received " +
1261 "from switch {} while in the slave role.",
1262 m.getType(), sw);
1263 }
1264 } else {
1265 handleMessage(sw, m, null);
1266 }
1267 }
1268 }
1269 finally {
1270 sw.getListenerReadLock().unlock();
1271 }
1272 }
1273 }
1274 }
1275
1276 // ****************
1277 // Message handlers
1278 // ****************
1279
1280 protected void handlePortStatusMessage(IOFSwitch sw,
1281 OFPortStatus m,
1282 boolean updateStorage) {
1283 short portNumber = m.getDesc().getPortNumber();
1284 OFPhysicalPort port = m.getDesc();
1285 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001286 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1287 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001288 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001289 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001290 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1291 try {
1292 this.updates.put(update);
1293 } catch (InterruptedException e) {
1294 log.error("Failure adding update to queue", e);
1295 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001296 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001297 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1298 try {
1299 this.updates.put(update);
1300 } catch (InterruptedException e) {
1301 log.error("Failure adding update to queue", e);
1302 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001303 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001304 if (updateStorage)
1305 updatePortInfo(sw, port);
1306 log.debug("Port #{} modified for {}", portNumber, sw);
1307 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1308 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001309 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1310 try {
1311 this.updates.put(update);
1312 } catch (InterruptedException e) {
1313 log.error("Failure adding update to queue", e);
1314 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001315 if (updateStorage)
1316 updatePortInfo(sw, port);
1317 log.debug("Port #{} added for {}", portNumber, sw);
1318 } else if (m.getReason() ==
1319 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1320 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001321 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1322 try {
1323 this.updates.put(update);
1324 } catch (InterruptedException e) {
1325 log.error("Failure adding update to queue", e);
1326 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001327 if (updateStorage)
1328 removePortInfo(sw, portNumber);
1329 log.debug("Port #{} deleted for {}", portNumber, sw);
1330 }
1331 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1332 try {
1333 this.updates.put(update);
1334 } catch (InterruptedException e) {
1335 log.error("Failure adding update to queue", e);
1336 }
1337 }
1338
1339 /**
1340 * flcontext_cache - Keep a thread local stack of contexts
1341 */
1342 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1343 new ThreadLocal <Stack<FloodlightContext>> () {
1344 @Override
1345 protected Stack<FloodlightContext> initialValue() {
1346 return new Stack<FloodlightContext>();
1347 }
1348 };
1349
1350 /**
1351 * flcontext_alloc - pop a context off the stack, if required create a new one
1352 * @return FloodlightContext
1353 */
1354 protected static FloodlightContext flcontext_alloc() {
1355 FloodlightContext flcontext = null;
1356
1357 if (flcontext_cache.get().empty()) {
1358 flcontext = new FloodlightContext();
1359 }
1360 else {
1361 flcontext = flcontext_cache.get().pop();
1362 }
1363
1364 return flcontext;
1365 }
1366
1367 /**
1368 * flcontext_free - Free the context to the current thread
1369 * @param flcontext
1370 */
1371 protected void flcontext_free(FloodlightContext flcontext) {
1372 flcontext.getStorage().clear();
1373 flcontext_cache.get().push(flcontext);
1374 }
1375
1376 /**
1377 * Handle replies to certain OFMessages, and pass others off to listeners
1378 * @param sw The switch for the message
1379 * @param m The message
1380 * @param bContext The floodlight context. If null then floodlight context would
1381 * be allocated in this function
1382 * @throws IOException
1383 */
1384 @LogMessageDocs({
1385 @LogMessageDoc(level="ERROR",
1386 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1387 " field is empty.",
1388 explanation="The switch sent an improperly-formatted PacketIn" +
1389 " message",
1390 recommendation=LogMessageDoc.CHECK_SWITCH),
1391 @LogMessageDoc(level="WARN",
1392 message="Unhandled OF Message: {} from {}",
1393 explanation="The switch sent a message not handled by " +
1394 "the controller")
1395 })
1396 protected void handleMessage(IOFSwitch sw, OFMessage m,
1397 FloodlightContext bContext)
1398 throws IOException {
1399 Ethernet eth = null;
1400
1401 switch (m.getType()) {
1402 case PACKET_IN:
1403 OFPacketIn pi = (OFPacketIn)m;
1404
1405 if (pi.getPacketData().length <= 0) {
1406 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1407 ") because the data field is empty.");
1408 return;
1409 }
1410
1411 if (Controller.ALWAYS_DECODE_ETH) {
1412 eth = new Ethernet();
1413 eth.deserialize(pi.getPacketData(), 0,
1414 pi.getPacketData().length);
1415 counterStore.updatePacketInCounters(sw, m, eth);
1416 }
1417 // fall through to default case...
1418
1419 default:
1420
1421 List<IOFMessageListener> listeners = null;
1422 if (messageListeners.containsKey(m.getType())) {
1423 listeners = messageListeners.get(m.getType()).
1424 getOrderedListeners();
1425 }
1426
1427 FloodlightContext bc = null;
1428 if (listeners != null) {
1429 // Check if floodlight context is passed from the calling
1430 // function, if so use that floodlight context, otherwise
1431 // allocate one
1432 if (bContext == null) {
1433 bc = flcontext_alloc();
1434 } else {
1435 bc = bContext;
1436 }
1437 if (eth != null) {
1438 IFloodlightProviderService.bcStore.put(bc,
1439 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1440 eth);
1441 }
1442
1443 // Get the starting time (overall and per-component) of
1444 // the processing chain for this packet if performance
1445 // monitoring is turned on
1446 pktinProcTime.bootstrap(listeners);
1447 pktinProcTime.recordStartTimePktIn();
1448 Command cmd;
1449 for (IOFMessageListener listener : listeners) {
1450 if (listener instanceof IOFSwitchFilter) {
1451 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1452 continue;
1453 }
1454 }
1455
1456 pktinProcTime.recordStartTimeComp(listener);
1457 cmd = listener.receive(sw, m, bc);
1458 pktinProcTime.recordEndTimeComp(listener);
1459
1460 if (Command.STOP.equals(cmd)) {
1461 break;
1462 }
1463 }
1464 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1465 } else {
1466 log.warn("Unhandled OF Message: {} from {}", m, sw);
1467 }
1468
1469 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1470 }
1471 }
1472
1473 /**
1474 * Log an OpenFlow error message from a switch
1475 * @param sw The switch that sent the error
1476 * @param error The error message
1477 */
1478 @LogMessageDoc(level="ERROR",
1479 message="Error {error type} {error code} from {switch}",
1480 explanation="The switch responded with an unexpected error" +
1481 "to an OpenFlow message from the controller",
1482 recommendation="This could indicate improper network operation. " +
1483 "If the problem persists restarting the switch and " +
1484 "controller may help."
1485 )
1486 protected void logError(IOFSwitch sw, OFError error) {
1487 int etint = 0xffff & error.getErrorType();
1488 if (etint < 0 || etint >= OFErrorType.values().length) {
1489 log.error("Unknown error code {} from sw {}", etint, sw);
1490 }
1491 OFErrorType et = OFErrorType.values()[etint];
1492 switch (et) {
1493 case OFPET_HELLO_FAILED:
1494 OFHelloFailedCode hfc =
1495 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1496 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1497 break;
1498 case OFPET_BAD_REQUEST:
1499 OFBadRequestCode brc =
1500 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1501 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1502 break;
1503 case OFPET_BAD_ACTION:
1504 OFBadActionCode bac =
1505 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1506 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1507 break;
1508 case OFPET_FLOW_MOD_FAILED:
1509 OFFlowModFailedCode fmfc =
1510 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1511 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1512 break;
1513 case OFPET_PORT_MOD_FAILED:
1514 OFPortModFailedCode pmfc =
1515 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1516 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1517 break;
1518 case OFPET_QUEUE_OP_FAILED:
1519 OFQueueOpFailedCode qofc =
1520 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1521 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1522 break;
1523 default:
1524 break;
1525 }
1526 }
1527
1528 /**
1529 * Add a switch to the active switch list and call the switch listeners.
1530 * This happens either when a switch first connects (and the controller is
1531 * not in the slave role) or when the role of the controller changes from
1532 * slave to master.
1533 * @param sw the switch that has been added
1534 */
1535 // TODO: need to rethink locking and the synchronous switch update.
1536 // We can / should also handle duplicate DPIDs in connectedSwitches
1537 @LogMessageDoc(level="ERROR",
1538 message="New switch added {switch} for already-added switch {switch}",
1539 explanation="A switch with the same DPID as another switch " +
1540 "connected to the controller. This can be caused by " +
1541 "multiple switches configured with the same DPID, or " +
1542 "by a switch reconnected very quickly after " +
1543 "disconnecting.",
1544 recommendation="If this happens repeatedly, it is likely there " +
1545 "are switches with duplicate DPIDs on the network. " +
1546 "Reconfigure the appropriate switches. If it happens " +
1547 "very rarely, then it is likely this is a transient " +
1548 "network problem that can be ignored."
1549 )
1550 protected void addSwitch(IOFSwitch sw) {
1551 // TODO: is it safe to modify the HashMap without holding
1552 // the old switch's lock?
1553 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1554 if (sw == oldSw) {
1555 // Note == for object equality, not .equals for value
1556 log.info("New add switch for pre-existing switch {}", sw);
1557 return;
1558 }
1559
1560 if (oldSw != null) {
1561 oldSw.getListenerWriteLock().lock();
1562 try {
1563 log.error("New switch added {} for already-added switch {}",
1564 sw, oldSw);
1565 // Set the connected flag to false to suppress calling
1566 // the listeners for this switch in processOFMessage
1567 oldSw.setConnected(false);
1568
1569 oldSw.cancelAllStatisticsReplies();
1570
1571 updateInactiveSwitchInfo(oldSw);
1572
1573 // we need to clean out old switch state definitively
1574 // before adding the new switch
1575 // FIXME: It seems not completely kosher to call the
1576 // switch listeners here. I thought one of the points of
1577 // having the asynchronous switch update mechanism was so
1578 // the addedSwitch and removedSwitch were always called
1579 // from a single thread to simplify concurrency issues
1580 // for the listener.
1581 if (switchListeners != null) {
1582 for (IOFSwitchListener listener : switchListeners) {
1583 listener.removedSwitch(oldSw);
1584 }
1585 }
1586 // will eventually trigger a removeSwitch(), which will cause
1587 // a "Not removing Switch ... already removed debug message.
1588 // TODO: Figure out a way to handle this that avoids the
1589 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001590 log.debug("Closing {} because a new IOFSwitch got added " +
1591 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001592 oldSw.getChannel().close();
1593 }
1594 finally {
1595 oldSw.getListenerWriteLock().unlock();
1596 }
1597 }
1598
1599 updateActiveSwitchInfo(sw);
1600 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1601 try {
1602 this.updates.put(update);
1603 } catch (InterruptedException e) {
1604 log.error("Failure adding update to queue", e);
1605 }
1606 }
1607
1608 /**
1609 * Remove a switch from the active switch list and call the switch listeners.
1610 * This happens either when the switch is disconnected or when the
1611 * controller's role for the switch changes from master to slave.
1612 * @param sw the switch that has been removed
1613 */
1614 protected void removeSwitch(IOFSwitch sw) {
1615 // No need to acquire the listener lock, since
1616 // this method is only called after netty has processed all
1617 // pending messages
1618 log.debug("removeSwitch: {}", sw);
1619 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1620 log.debug("Not removing switch {}; already removed", sw);
1621 return;
1622 }
1623 // We cancel all outstanding statistics replies if the switch transition
1624 // from active. In the future we might allow statistics requests
1625 // from slave controllers. Then we need to move this cancelation
1626 // to switch disconnect
1627 sw.cancelAllStatisticsReplies();
1628
1629 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1630 // here if role support is enabled. In that case if the switch is being
1631 // removed because we've been switched to being in the slave role, then I think
1632 // it's possible that the new master may have already been promoted to master
1633 // and written out the active switch state to storage. If we now execute
1634 // updateInactiveSwitchInfo we may wipe out all of the state that was
1635 // written out by the new master. Maybe need to revisit how we handle all
1636 // of the switch state that's written to storage.
1637
1638 updateInactiveSwitchInfo(sw);
1639 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1640 try {
1641 this.updates.put(update);
1642 } catch (InterruptedException e) {
1643 log.error("Failure adding update to queue", e);
1644 }
1645 }
1646
1647 // ***************
1648 // IFloodlightProvider
1649 // ***************
1650
1651 @Override
1652 public synchronized void addOFMessageListener(OFType type,
1653 IOFMessageListener listener) {
1654 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1655 messageListeners.get(type);
1656 if (ldd == null) {
1657 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1658 messageListeners.put(type, ldd);
1659 }
1660 ldd.addListener(type, listener);
1661 }
1662
1663 @Override
1664 public synchronized void removeOFMessageListener(OFType type,
1665 IOFMessageListener listener) {
1666 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1667 messageListeners.get(type);
1668 if (ldd != null) {
1669 ldd.removeListener(listener);
1670 }
1671 }
1672
1673 private void logListeners() {
1674 for (Map.Entry<OFType,
1675 ListenerDispatcher<OFType,
1676 IOFMessageListener>> entry
1677 : messageListeners.entrySet()) {
1678
1679 OFType type = entry.getKey();
1680 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1681 entry.getValue();
1682
1683 StringBuffer sb = new StringBuffer();
1684 sb.append("OFListeners for ");
1685 sb.append(type);
1686 sb.append(": ");
1687 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1688 sb.append(l.getName());
1689 sb.append(",");
1690 }
1691 log.debug(sb.toString());
1692 }
1693 }
1694
1695 public void removeOFMessageListeners(OFType type) {
1696 messageListeners.remove(type);
1697 }
1698
1699 @Override
1700 public Map<Long, IOFSwitch> getSwitches() {
1701 return Collections.unmodifiableMap(this.activeSwitches);
1702 }
1703
1704 @Override
1705 public void addOFSwitchListener(IOFSwitchListener listener) {
1706 this.switchListeners.add(listener);
1707 }
1708
1709 @Override
1710 public void removeOFSwitchListener(IOFSwitchListener listener) {
1711 this.switchListeners.remove(listener);
1712 }
1713
1714 @Override
1715 public Map<OFType, List<IOFMessageListener>> getListeners() {
1716 Map<OFType, List<IOFMessageListener>> lers =
1717 new HashMap<OFType, List<IOFMessageListener>>();
1718 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1719 messageListeners.entrySet()) {
1720 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1721 }
1722 return Collections.unmodifiableMap(lers);
1723 }
1724
1725 @Override
1726 @LogMessageDocs({
1727 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1728 "a null switch",
1729 explanation="Failed to process a message because the switch " +
1730 " is no longer connected."),
1731 @LogMessageDoc(level="ERROR",
1732 message="Error reinjecting OFMessage on switch {switch}",
1733 explanation="An I/O error occured while attempting to " +
1734 "process an OpenFlow message",
1735 recommendation=LogMessageDoc.CHECK_SWITCH)
1736 })
1737 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1738 FloodlightContext bc) {
1739 if (sw == null) {
1740 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1741 return false;
1742 }
1743
1744 // FIXME: Do we need to be able to inject messages to switches
1745 // where we're the slave controller (i.e. they're connected but
1746 // not active)?
1747 // FIXME: Don't we need synchronization logic here so we're holding
1748 // the listener read lock when we call handleMessage? After some
1749 // discussions it sounds like the right thing to do here would be to
1750 // inject the message as a netty upstream channel event so it goes
1751 // through the normal netty event processing, including being
1752 // handled
1753 if (!activeSwitches.containsKey(sw.getId())) return false;
1754
1755 try {
1756 // Pass Floodlight context to the handleMessages()
1757 handleMessage(sw, msg, bc);
1758 } catch (IOException e) {
1759 log.error("Error reinjecting OFMessage on switch {}",
1760 HexString.toHexString(sw.getId()));
1761 return false;
1762 }
1763 return true;
1764 }
1765
1766 @Override
1767 @LogMessageDoc(message="Calling System.exit",
1768 explanation="The controller is terminating")
1769 public synchronized void terminate() {
1770 log.info("Calling System.exit");
1771 System.exit(1);
1772 }
1773
1774 @Override
1775 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1776 // call the overloaded version with floodlight context set to null
1777 return injectOfMessage(sw, msg, null);
1778 }
1779
1780 @Override
1781 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1782 FloodlightContext bc) {
1783 if (log.isTraceEnabled()) {
1784 String str = OFMessage.getDataAsString(sw, m, bc);
1785 log.trace("{}", str);
1786 }
1787
1788 List<IOFMessageListener> listeners = null;
1789 if (messageListeners.containsKey(m.getType())) {
1790 listeners =
1791 messageListeners.get(m.getType()).getOrderedListeners();
1792 }
1793
1794 if (listeners != null) {
1795 for (IOFMessageListener listener : listeners) {
1796 if (listener instanceof IOFSwitchFilter) {
1797 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1798 continue;
1799 }
1800 }
1801 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1802 break;
1803 }
1804 }
1805 }
1806 }
1807
1808 @Override
1809 public BasicFactory getOFMessageFactory() {
1810 return factory;
1811 }
1812
1813 @Override
1814 public String getControllerId() {
1815 return controllerId;
1816 }
1817
1818 // **************
1819 // Initialization
1820 // **************
1821
1822 protected void updateAllInactiveSwitchInfo() {
1823 if (role == Role.SLAVE) {
1824 return;
1825 }
1826 String controllerId = getControllerId();
1827 String[] switchColumns = { SWITCH_DATAPATH_ID,
1828 SWITCH_CONTROLLER_ID,
1829 SWITCH_ACTIVE };
1830 String[] portColumns = { PORT_ID, PORT_SWITCH };
1831 IResultSet switchResultSet = null;
1832 try {
1833 OperatorPredicate op =
1834 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1835 OperatorPredicate.Operator.EQ,
1836 controllerId);
1837 switchResultSet =
1838 storageSource.executeQuery(SWITCH_TABLE_NAME,
1839 switchColumns,
1840 op, null);
1841 while (switchResultSet.next()) {
1842 IResultSet portResultSet = null;
1843 try {
1844 String datapathId =
1845 switchResultSet.getString(SWITCH_DATAPATH_ID);
1846 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1847 op = new OperatorPredicate(PORT_SWITCH,
1848 OperatorPredicate.Operator.EQ,
1849 datapathId);
1850 portResultSet =
1851 storageSource.executeQuery(PORT_TABLE_NAME,
1852 portColumns,
1853 op, null);
1854 while (portResultSet.next()) {
1855 portResultSet.deleteRow();
1856 }
1857 portResultSet.save();
1858 }
1859 finally {
1860 if (portResultSet != null)
1861 portResultSet.close();
1862 }
1863 }
1864 switchResultSet.save();
1865 }
1866 finally {
1867 if (switchResultSet != null)
1868 switchResultSet.close();
1869 }
1870 }
1871
1872 protected void updateControllerInfo() {
1873 updateAllInactiveSwitchInfo();
1874
1875 // Write out the controller info to the storage source
1876 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1877 String id = getControllerId();
1878 controllerInfo.put(CONTROLLER_ID, id);
1879 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1880 }
1881
1882 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1883 if (role == Role.SLAVE) {
1884 return;
1885 }
1886 // Obtain the row info for the switch
1887 Map<String, Object> switchInfo = new HashMap<String, Object>();
1888 String datapathIdString = sw.getStringId();
1889 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1890 String controllerId = getControllerId();
1891 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1892 Date connectedSince = sw.getConnectedSince();
1893 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1894 Channel channel = sw.getChannel();
1895 SocketAddress socketAddress = channel.getRemoteAddress();
1896 if (socketAddress != null) {
1897 String socketAddressString = socketAddress.toString();
1898 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1899 if (socketAddress instanceof InetSocketAddress) {
1900 InetSocketAddress inetSocketAddress =
1901 (InetSocketAddress)socketAddress;
1902 InetAddress inetAddress = inetSocketAddress.getAddress();
1903 String ip = inetAddress.getHostAddress();
1904 switchInfo.put(SWITCH_IP, ip);
1905 }
1906 }
1907
1908 // Write out the switch features info
1909 long capabilities = U32.f(sw.getCapabilities());
1910 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1911 long buffers = U32.f(sw.getBuffers());
1912 switchInfo.put(SWITCH_BUFFERS, buffers);
1913 long tables = U32.f(sw.getTables());
1914 switchInfo.put(SWITCH_TABLES, tables);
1915 long actions = U32.f(sw.getActions());
1916 switchInfo.put(SWITCH_ACTIONS, actions);
1917 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1918
1919 // Update the switch
1920 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1921
1922 // Update the ports
1923 for (OFPhysicalPort port: sw.getPorts()) {
1924 updatePortInfo(sw, port);
1925 }
1926 }
1927
1928 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1929 if (role == Role.SLAVE) {
1930 return;
1931 }
1932 log.debug("Update DB with inactiveSW {}", sw);
1933 // Update the controller info in the storage source to be inactive
1934 Map<String, Object> switchInfo = new HashMap<String, Object>();
1935 String datapathIdString = sw.getStringId();
1936 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1937 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1938 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1939 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1940 }
1941
1942 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1943 if (role == Role.SLAVE) {
1944 return;
1945 }
1946 String datapathIdString = sw.getStringId();
1947 Map<String, Object> portInfo = new HashMap<String, Object>();
1948 int portNumber = U16.f(port.getPortNumber());
1949 String id = datapathIdString + "|" + portNumber;
1950 portInfo.put(PORT_ID, id);
1951 portInfo.put(PORT_SWITCH, datapathIdString);
1952 portInfo.put(PORT_NUMBER, portNumber);
1953 byte[] hardwareAddress = port.getHardwareAddress();
1954 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1955 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1956 String name = port.getName();
1957 portInfo.put(PORT_NAME, name);
1958 long config = U32.f(port.getConfig());
1959 portInfo.put(PORT_CONFIG, config);
1960 long state = U32.f(port.getState());
1961 portInfo.put(PORT_STATE, state);
1962 long currentFeatures = U32.f(port.getCurrentFeatures());
1963 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1964 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1965 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1966 long supportedFeatures = U32.f(port.getSupportedFeatures());
1967 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1968 long peerFeatures = U32.f(port.getPeerFeatures());
1969 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1970 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1971 }
1972
1973 /**
1974 * Read switch port data from storage and write it into a switch object
1975 * @param sw the switch to update
1976 */
1977 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1978 OperatorPredicate op =
1979 new OperatorPredicate(PORT_SWITCH,
1980 OperatorPredicate.Operator.EQ,
1981 sw.getStringId());
1982 IResultSet portResultSet =
1983 storageSource.executeQuery(PORT_TABLE_NAME,
1984 null, op, null);
1985 //Map<Short, OFPhysicalPort> oldports =
1986 // new HashMap<Short, OFPhysicalPort>();
1987 //oldports.putAll(sw.getPorts());
1988
1989 while (portResultSet.next()) {
1990 try {
1991 OFPhysicalPort p = new OFPhysicalPort();
1992 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1993 p.setName(portResultSet.getString(PORT_NAME));
1994 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1995 p.setState((int)portResultSet.getLong(PORT_STATE));
1996 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1997 p.setHardwareAddress(HexString.fromHexString(portMac));
1998 p.setCurrentFeatures((int)portResultSet.
1999 getLong(PORT_CURRENT_FEATURES));
2000 p.setAdvertisedFeatures((int)portResultSet.
2001 getLong(PORT_ADVERTISED_FEATURES));
2002 p.setSupportedFeatures((int)portResultSet.
2003 getLong(PORT_SUPPORTED_FEATURES));
2004 p.setPeerFeatures((int)portResultSet.
2005 getLong(PORT_PEER_FEATURES));
2006 //oldports.remove(Short.valueOf(p.getPortNumber()));
2007 sw.setPort(p);
2008 } catch (NullPointerException e) {
2009 // ignore
2010 }
2011 }
2012 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2013 try {
2014 this.updates.put(update);
2015 } catch (InterruptedException e) {
2016 log.error("Failure adding update to queue", e);
2017 }
2018 }
2019
2020 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2021 if (role == Role.SLAVE) {
2022 return;
2023 }
2024 String datapathIdString = sw.getStringId();
2025 String id = datapathIdString + "|" + portNumber;
2026 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2027 }
2028
2029 /**
2030 * Sets the initial role based on properties in the config params.
2031 * It looks for two different properties.
2032 * If the "role" property is specified then the value should be
2033 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2034 * controller is set to the specified value. If the "role" property
2035 * is not specified then it looks next for the "role.path" property.
2036 * In this case the value should be the path to a property file in
2037 * the file system that contains a property called "floodlight.role"
2038 * which can be one of the values listed above for the "role" property.
2039 * The idea behind the "role.path" mechanism is that you have some
2040 * separate heartbeat and master controller election algorithm that
2041 * determines the role of the controller. When a role transition happens,
2042 * it updates the current role in the file specified by the "role.path"
2043 * file. Then if floodlight restarts for some reason it can get the
2044 * correct current role of the controller from the file.
2045 * @param configParams The config params for the FloodlightProvider service
2046 * @return A valid role if role information is specified in the
2047 * config params, otherwise null
2048 */
2049 @LogMessageDocs({
2050 @LogMessageDoc(message="Controller role set to {role}",
2051 explanation="Setting the initial HA role to "),
2052 @LogMessageDoc(level="ERROR",
2053 message="Invalid current role value: {role}",
2054 explanation="An invalid HA role value was read from the " +
2055 "properties file",
2056 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2057 })
2058 protected Role getInitialRole(Map<String, String> configParams) {
2059 Role role = null;
2060 String roleString = configParams.get("role");
2061 if (roleString == null) {
2062 String rolePath = configParams.get("rolepath");
2063 if (rolePath != null) {
2064 Properties properties = new Properties();
2065 try {
2066 properties.load(new FileInputStream(rolePath));
2067 roleString = properties.getProperty("floodlight.role");
2068 }
2069 catch (IOException exc) {
2070 // Don't treat it as an error if the file specified by the
2071 // rolepath property doesn't exist. This lets us enable the
2072 // HA mechanism by just creating/setting the floodlight.role
2073 // property in that file without having to modify the
2074 // floodlight properties.
2075 }
2076 }
2077 }
2078
2079 if (roleString != null) {
2080 // Canonicalize the string to the form used for the enum constants
2081 roleString = roleString.trim().toUpperCase();
2082 try {
2083 role = Role.valueOf(roleString);
2084 }
2085 catch (IllegalArgumentException exc) {
2086 log.error("Invalid current role value: {}", roleString);
2087 }
2088 }
2089
2090 log.info("Controller role set to {}", role);
2091
2092 return role;
2093 }
2094
2095 /**
2096 * Tell controller that we're ready to accept switches loop
2097 * @throws IOException
2098 */
2099 @LogMessageDocs({
2100 @LogMessageDoc(message="Listening for switch connections on {address}",
2101 explanation="The controller is ready and listening for new" +
2102 " switch connections"),
2103 @LogMessageDoc(message="Storage exception in controller " +
2104 "updates loop; terminating process",
2105 explanation=ERROR_DATABASE,
2106 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2107 @LogMessageDoc(level="ERROR",
2108 message="Exception in controller updates loop",
2109 explanation="Failed to dispatch controller event",
2110 recommendation=LogMessageDoc.GENERIC_ACTION)
2111 })
2112 public void run() {
2113 if (log.isDebugEnabled()) {
2114 logListeners();
2115 }
2116
2117 try {
2118 final ServerBootstrap bootstrap = createServerBootStrap();
2119
2120 bootstrap.setOption("reuseAddr", true);
2121 bootstrap.setOption("child.keepAlive", true);
2122 bootstrap.setOption("child.tcpNoDelay", true);
2123 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2124
2125 ChannelPipelineFactory pfact =
2126 new OpenflowPipelineFactory(this, null);
2127 bootstrap.setPipelineFactory(pfact);
2128 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2129 final ChannelGroup cg = new DefaultChannelGroup();
2130 cg.add(bootstrap.bind(sa));
2131
2132 log.info("Listening for switch connections on {}", sa);
2133 } catch (Exception e) {
2134 throw new RuntimeException(e);
2135 }
2136
2137 // main loop
2138 while (true) {
2139 try {
2140 IUpdate update = updates.take();
2141 update.dispatch();
2142 } catch (InterruptedException e) {
2143 return;
2144 } catch (StorageException e) {
2145 log.error("Storage exception in controller " +
2146 "updates loop; terminating process", e);
2147 return;
2148 } catch (Exception e) {
2149 log.error("Exception in controller updates loop", e);
2150 }
2151 }
2152 }
2153
2154 private ServerBootstrap createServerBootStrap() {
2155 if (workerThreads == 0) {
2156 return new ServerBootstrap(
2157 new NioServerSocketChannelFactory(
2158 Executors.newCachedThreadPool(),
2159 Executors.newCachedThreadPool()));
2160 } else {
2161 return new ServerBootstrap(
2162 new NioServerSocketChannelFactory(
2163 Executors.newCachedThreadPool(),
2164 Executors.newCachedThreadPool(), workerThreads));
2165 }
2166 }
2167
2168 public void setConfigParams(Map<String, String> configParams) {
2169 String ofPort = configParams.get("openflowport");
2170 if (ofPort != null) {
2171 this.openFlowPort = Integer.parseInt(ofPort);
2172 }
2173 log.debug("OpenFlow port set to {}", this.openFlowPort);
2174 String threads = configParams.get("workerthreads");
2175 if (threads != null) {
2176 this.workerThreads = Integer.parseInt(threads);
2177 }
2178 log.debug("Number of worker threads set to {}", this.workerThreads);
2179 String controllerId = configParams.get("controllerid");
2180 if (controllerId != null) {
2181 this.controllerId = controllerId;
2182 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002183 else {
2184 //Try to get the hostname of the machine and use that for controller ID
2185 try {
2186 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2187 this.controllerId = hostname;
2188 } catch (UnknownHostException e) {
2189 // Can't get hostname, we'll just use the default
2190 }
2191 }
2192
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002193 log.debug("ControllerId set to {}", this.controllerId);
2194 }
2195
2196 private void initVendorMessages() {
2197 // Configure openflowj to be able to parse the role request/reply
2198 // vendor messages.
2199 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2200 OFNiciraVendorData.NX_VENDOR_ID, 4);
2201 OFVendorId.registerVendorId(niciraVendorId);
2202 OFBasicVendorDataType roleRequestVendorData =
2203 new OFBasicVendorDataType(
2204 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2205 OFRoleRequestVendorData.getInstantiable());
2206 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2207 OFBasicVendorDataType roleReplyVendorData =
2208 new OFBasicVendorDataType(
2209 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2210 OFRoleReplyVendorData.getInstantiable());
2211 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2212 }
2213
2214 /**
2215 * Initialize internal data structures
2216 */
2217 public void init(Map<String, String> configParams) {
2218 // These data structures are initialized here because other
2219 // module's startUp() might be called before ours
2220 this.messageListeners =
2221 new ConcurrentHashMap<OFType,
2222 ListenerDispatcher<OFType,
2223 IOFMessageListener>>();
2224 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2225 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2226 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2227 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2228 this.controllerNodeIPsCache = new HashMap<String, String>();
2229 this.updates = new LinkedBlockingQueue<IUpdate>();
2230 this.factory = new BasicFactory();
2231 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2232 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002233 //this.role = getInitialRole(configParams);
2234 //Set the controller's role to MASTER so it always tries to do role requests.
2235 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002236 this.roleChanger = new RoleChanger();
2237 initVendorMessages();
2238 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002239 }
2240
2241 /**
2242 * Startup all of the controller's components
2243 */
2244 @LogMessageDoc(message="Waiting for storage source",
2245 explanation="The system database is not yet ready",
2246 recommendation="If this message persists, this indicates " +
2247 "that the system database has failed to start. " +
2248 LogMessageDoc.CHECK_CONTROLLER)
2249 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002250 try {
2251 registryService.registerController(controllerId);
2252 } catch (RegistryException e2) {
2253 log.warn("Registry service error: {}", e2.getMessage());
2254 }
2255
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002256 // Create the table names we use
2257 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2258 storageSource.createTable(SWITCH_TABLE_NAME, null);
2259 storageSource.createTable(PORT_TABLE_NAME, null);
2260 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2261 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2262 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2263 CONTROLLER_ID);
2264 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2265 SWITCH_DATAPATH_ID);
2266 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2267 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2268 CONTROLLER_INTERFACE_ID);
2269 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2270
2271 while (true) {
2272 try {
2273 updateControllerInfo();
2274 break;
2275 }
2276 catch (StorageException e) {
2277 log.info("Waiting for storage source");
2278 try {
2279 Thread.sleep(1000);
2280 } catch (InterruptedException e1) {
2281 }
2282 }
2283 }
2284
2285 // Add our REST API
2286 restApi.addRestletRoutable(new CoreWebRoutable());
2287 }
2288
2289 @Override
2290 public void addInfoProvider(String type, IInfoProvider provider) {
2291 if (!providerMap.containsKey(type)) {
2292 providerMap.put(type, new ArrayList<IInfoProvider>());
2293 }
2294 providerMap.get(type).add(provider);
2295 }
2296
2297 @Override
2298 public void removeInfoProvider(String type, IInfoProvider provider) {
2299 if (!providerMap.containsKey(type)) {
2300 log.debug("Provider type {} doesn't exist.", type);
2301 return;
2302 }
2303
2304 providerMap.get(type).remove(provider);
2305 }
2306
2307 public Map<String, Object> getControllerInfo(String type) {
2308 if (!providerMap.containsKey(type)) return null;
2309
2310 Map<String, Object> result = new LinkedHashMap<String, Object>();
2311 for (IInfoProvider provider : providerMap.get(type)) {
2312 result.putAll(provider.getInfo(type));
2313 }
2314
2315 return result;
2316 }
2317
2318 @Override
2319 public void addHAListener(IHAListener listener) {
2320 this.haListeners.add(listener);
2321 }
2322
2323 @Override
2324 public void removeHAListener(IHAListener listener) {
2325 this.haListeners.remove(listener);
2326 }
2327
2328
2329 /**
2330 * Handle changes to the controller nodes IPs and dispatch update.
2331 */
2332 @SuppressWarnings("unchecked")
2333 protected void handleControllerNodeIPChanges() {
2334 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2335 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2336 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2337 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2338 CONTROLLER_INTERFACE_TYPE,
2339 CONTROLLER_INTERFACE_NUMBER,
2340 CONTROLLER_INTERFACE_DISCOVERED_IP };
2341 synchronized(controllerNodeIPsCache) {
2342 // We currently assume that interface Ethernet0 is the relevant
2343 // controller interface. Might change.
2344 // We could (should?) implement this using
2345 // predicates, but creating the individual and compound predicate
2346 // seems more overhead then just checking every row. Particularly,
2347 // since the number of rows is small and changes infrequent
2348 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2349 colNames,null, null);
2350 while (res.next()) {
2351 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2352 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2353 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2354 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2355 String curIP = controllerNodeIPsCache.get(controllerID);
2356
2357 curControllerNodeIPs.put(controllerID, discoveredIP);
2358 if (curIP == null) {
2359 // new controller node IP
2360 addedControllerNodeIPs.put(controllerID, discoveredIP);
2361 }
2362 else if (curIP != discoveredIP) {
2363 // IP changed
2364 removedControllerNodeIPs.put(controllerID, curIP);
2365 addedControllerNodeIPs.put(controllerID, discoveredIP);
2366 }
2367 }
2368 }
2369 // Now figure out if rows have been deleted. We can't use the
2370 // rowKeys from rowsDeleted directly, since the tables primary
2371 // key is a compound that we can't disassemble
2372 Set<String> curEntries = curControllerNodeIPs.keySet();
2373 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2374 removedEntries.removeAll(curEntries);
2375 for (String removedControllerID : removedEntries)
2376 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2377 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2378 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2379 curControllerNodeIPs, addedControllerNodeIPs,
2380 removedControllerNodeIPs);
2381 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2382 try {
2383 this.updates.put(update);
2384 } catch (InterruptedException e) {
2385 log.error("Failure adding update to queue", e);
2386 }
2387 }
2388 }
2389 }
2390
2391 @Override
2392 public Map<String, String> getControllerNodeIPs() {
2393 // We return a copy of the mapping so we can guarantee that
2394 // the mapping return is the same as one that will be (or was)
2395 // dispatched to IHAListeners
2396 HashMap<String,String> retval = new HashMap<String,String>();
2397 synchronized(controllerNodeIPsCache) {
2398 retval.putAll(controllerNodeIPsCache);
2399 }
2400 return retval;
2401 }
2402
2403 @Override
2404 public void rowsModified(String tableName, Set<Object> rowKeys) {
2405 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2406 handleControllerNodeIPChanges();
2407 }
2408
2409 }
2410
2411 @Override
2412 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2413 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2414 handleControllerNodeIPChanges();
2415 }
2416 }
2417
2418 @Override
2419 public long getSystemStartTime() {
2420 return (this.systemStartTime);
2421 }
2422
2423 @Override
2424 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2425 this.alwaysClearFlowsOnSwAdd = value;
2426 }
2427
2428 public boolean getAlwaysClearFlowsOnSwAdd() {
2429 return this.alwaysClearFlowsOnSwAdd;
2430 }
2431}