blob: e40f24ade7c6f591faaaa2d48406f2fb5f9a6870 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
61import net.floodlightcontroller.core.annotations.LogMessageDoc;
62import net.floodlightcontroller.core.annotations.LogMessageDocs;
63import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
64import net.floodlightcontroller.core.util.ListenerDispatcher;
65import net.floodlightcontroller.core.web.CoreWebRoutable;
66import net.floodlightcontroller.counter.ICounterStoreService;
67import net.floodlightcontroller.packet.Ethernet;
68import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
69import net.floodlightcontroller.restserver.IRestApiService;
70import net.floodlightcontroller.storage.IResultSet;
71import net.floodlightcontroller.storage.IStorageSourceListener;
72import net.floodlightcontroller.storage.IStorageSourceService;
73import net.floodlightcontroller.storage.OperatorPredicate;
74import net.floodlightcontroller.storage.StorageException;
75import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070076import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
HIGUCHI Yuta20514902013-06-12 11:24:16 -070077import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoRouteService;
HIGUCHI Yuta60a10142013-06-14 15:50:10 -070078import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080079import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080080import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080081import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080082
83import org.jboss.netty.bootstrap.ServerBootstrap;
84import org.jboss.netty.buffer.ChannelBuffer;
85import org.jboss.netty.buffer.ChannelBuffers;
86import org.jboss.netty.channel.Channel;
87import org.jboss.netty.channel.ChannelHandlerContext;
88import org.jboss.netty.channel.ChannelPipelineFactory;
89import org.jboss.netty.channel.ChannelStateEvent;
90import org.jboss.netty.channel.ChannelUpstreamHandler;
91import org.jboss.netty.channel.Channels;
92import org.jboss.netty.channel.ExceptionEvent;
93import org.jboss.netty.channel.MessageEvent;
94import org.jboss.netty.channel.group.ChannelGroup;
95import org.jboss.netty.channel.group.DefaultChannelGroup;
96import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
97import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
98import org.jboss.netty.handler.timeout.IdleStateEvent;
99import org.jboss.netty.handler.timeout.ReadTimeoutException;
100import org.openflow.protocol.OFEchoReply;
101import org.openflow.protocol.OFError;
102import org.openflow.protocol.OFError.OFBadActionCode;
103import org.openflow.protocol.OFError.OFBadRequestCode;
104import org.openflow.protocol.OFError.OFErrorType;
105import org.openflow.protocol.OFError.OFFlowModFailedCode;
106import org.openflow.protocol.OFError.OFHelloFailedCode;
107import org.openflow.protocol.OFError.OFPortModFailedCode;
108import org.openflow.protocol.OFError.OFQueueOpFailedCode;
109import org.openflow.protocol.OFFeaturesReply;
110import org.openflow.protocol.OFGetConfigReply;
111import org.openflow.protocol.OFMessage;
112import org.openflow.protocol.OFPacketIn;
113import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800115import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800116import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800117import org.openflow.protocol.OFPortStatus.OFPortReason;
118import org.openflow.protocol.OFSetConfig;
119import org.openflow.protocol.OFStatisticsRequest;
120import org.openflow.protocol.OFSwitchConfig;
121import org.openflow.protocol.OFType;
122import org.openflow.protocol.OFVendor;
123import org.openflow.protocol.factory.BasicFactory;
124import org.openflow.protocol.factory.MessageParseException;
125import org.openflow.protocol.statistics.OFDescriptionStatistics;
126import org.openflow.protocol.statistics.OFStatistics;
127import org.openflow.protocol.statistics.OFStatisticsType;
128import org.openflow.protocol.vendor.OFBasicVendorDataType;
129import org.openflow.protocol.vendor.OFBasicVendorId;
130import org.openflow.protocol.vendor.OFVendorId;
131import org.openflow.util.HexString;
132import org.openflow.util.U16;
133import org.openflow.util.U32;
134import org.openflow.vendor.nicira.OFNiciraVendorData;
135import org.openflow.vendor.nicira.OFRoleReplyVendorData;
136import org.openflow.vendor.nicira.OFRoleRequestVendorData;
137import org.openflow.vendor.nicira.OFRoleVendorData;
138import org.slf4j.Logger;
139import org.slf4j.LoggerFactory;
140
141
142/**
143 * The main controller class. Handles all setup and network listeners
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700144 *
145 * Extensions made by ONOS are:
146 * - Detailed Port event: PORTCHANGED -> {PORTCHANGED, PORTADDED, PORTREMOVED}
147 * Available as net.onrc.onos.ofcontroller.core.IOFSwitchPortListener
148 * - Distributed ownership control of switch through RegistryService(IControllerRegistryService)
149 * - Register ONOS services. (IFlowService, ITopoRouteService, IControllerRegistryService)
150 * - Additional DEBUG logs
151 * - Try using hostname as controller ID, when ID was not explicitly given.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800152 */
153public class Controller implements IFloodlightProviderService,
154 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700155
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800156 protected static Logger log = LoggerFactory.getLogger(Controller.class);
157
158 private static final String ERROR_DATABASE =
159 "The controller could not communicate with the system database.";
160
161 protected BasicFactory factory;
162 protected ConcurrentMap<OFType,
163 ListenerDispatcher<OFType,IOFMessageListener>>
164 messageListeners;
165 // The activeSwitches map contains only those switches that are actively
166 // being controlled by us -- it doesn't contain switches that are
167 // in the slave role
168 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
169 // connectedSwitches contains all connected switches, including ones where
170 // we're a slave controller. We need to keep track of them so that we can
171 // send role request messages to switches when our role changes to master
172 // We add a switch to this set after it successfully completes the
173 // handshake. Access to this Set needs to be synchronized with roleChanger
174 protected HashSet<OFSwitchImpl> connectedSwitches;
175
176 // The controllerNodeIPsCache maps Controller IDs to their IP address.
177 // It's only used by handleControllerNodeIPsChanged
178 protected HashMap<String, String> controllerNodeIPsCache;
179
180 protected Set<IOFSwitchListener> switchListeners;
181 protected Set<IHAListener> haListeners;
182 protected Map<String, List<IInfoProvider>> providerMap;
183 protected BlockingQueue<IUpdate> updates;
184
185 // Module dependencies
186 protected IRestApiService restApi;
187 protected ICounterStoreService counterStore = null;
188 protected IStorageSourceService storageSource;
189 protected IPktInProcessingTimeService pktinProcTime;
190 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800191 protected IFlowService flowService;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800192 protected ITopoRouteService topoRouteService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800193 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800194
195 // Configuration options
196 protected int openFlowPort = 6633;
197 protected int workerThreads = 0;
198 // The id for this controller node. Should be unique for each controller
199 // node in a controller cluster.
200 protected String controllerId = "localhost";
201
202 // The current role of the controller.
203 // If the controller isn't configured to support roles, then this is null.
204 protected Role role;
205 // A helper that handles sending and timeout handling for role requests
206 protected RoleChanger roleChanger;
207
208 // Start time of the controller
209 protected long systemStartTime;
210
211 // Flag to always flush flow table on switch reconnect (HA or otherwise)
212 protected boolean alwaysClearFlowsOnSwAdd = false;
213
214 // Storage table names
215 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
216 protected static final String CONTROLLER_ID = "id";
217
218 protected static final String SWITCH_TABLE_NAME = "controller_switch";
219 protected static final String SWITCH_DATAPATH_ID = "dpid";
220 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
221 protected static final String SWITCH_IP = "ip";
222 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
223 protected static final String SWITCH_ACTIVE = "active";
224 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
225 protected static final String SWITCH_CAPABILITIES = "capabilities";
226 protected static final String SWITCH_BUFFERS = "buffers";
227 protected static final String SWITCH_TABLES = "tables";
228 protected static final String SWITCH_ACTIONS = "actions";
229
230 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
231 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
232
233 protected static final String PORT_TABLE_NAME = "controller_port";
234 protected static final String PORT_ID = "id";
235 protected static final String PORT_SWITCH = "switch_id";
236 protected static final String PORT_NUMBER = "number";
237 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
238 protected static final String PORT_NAME = "name";
239 protected static final String PORT_CONFIG = "config";
240 protected static final String PORT_STATE = "state";
241 protected static final String PORT_CURRENT_FEATURES = "current_features";
242 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
243 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
244 protected static final String PORT_PEER_FEATURES = "peer_features";
245
246 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
247 protected static final String CONTROLLER_INTERFACE_ID = "id";
248 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
249 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
250 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
251 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
252
253
254
255 // Perf. related configuration
256 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
257 protected static final int BATCH_MAX_SIZE = 100;
258 protected static final boolean ALWAYS_DECODE_ETH = true;
259
260 /**
261 * Updates handled by the main loop
262 */
263 protected interface IUpdate {
264 /**
265 * Calls the appropriate listeners
266 */
267 public void dispatch();
268 }
269 public enum SwitchUpdateType {
270 ADDED,
271 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700272 PORTCHANGED,
273 PORTADDED,
274 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800275 }
276 /**
277 * Update message indicating a switch was added or removed
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700278 * ONOS: This message extended to indicate Port add or removed event.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800279 */
280 protected class SwitchUpdate implements IUpdate {
281 public IOFSwitch sw;
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700282 public OFPhysicalPort port; // Added by ONOS
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800283 public SwitchUpdateType switchUpdateType;
284 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
285 this.sw = sw;
286 this.switchUpdateType = switchUpdateType;
287 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700288 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
289 this.sw = sw;
290 this.port = port;
291 this.switchUpdateType = switchUpdateType;
292 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800293 public void dispatch() {
294 if (log.isTraceEnabled()) {
295 log.trace("Dispatching switch update {} {}",
296 sw, switchUpdateType);
297 }
298 if (switchListeners != null) {
299 for (IOFSwitchListener listener : switchListeners) {
300 switch(switchUpdateType) {
301 case ADDED:
302 listener.addedSwitch(sw);
303 break;
304 case REMOVED:
305 listener.removedSwitch(sw);
306 break;
307 case PORTCHANGED:
308 listener.switchPortChanged(sw.getId());
309 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700310 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700311 if (listener instanceof IOFSwitchPortListener) {
312 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
313 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700314 break;
315 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700316 if (listener instanceof IOFSwitchPortListener) {
317 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
318 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700319 break;
320 default:
321 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800322 }
323 }
324 }
325 }
326 }
327
328 /**
329 * Update message indicating controller's role has changed
330 */
331 protected class HARoleUpdate implements IUpdate {
332 public Role oldRole;
333 public Role newRole;
334 public HARoleUpdate(Role newRole, Role oldRole) {
335 this.oldRole = oldRole;
336 this.newRole = newRole;
337 }
338 public void dispatch() {
339 // Make sure that old and new roles are different.
340 if (oldRole == newRole) {
341 if (log.isTraceEnabled()) {
342 log.trace("HA role update ignored as the old and " +
343 "new roles are the same. newRole = {}" +
344 "oldRole = {}", newRole, oldRole);
345 }
346 return;
347 }
348 if (log.isTraceEnabled()) {
349 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
350 newRole, oldRole);
351 }
352 if (haListeners != null) {
353 for (IHAListener listener : haListeners) {
354 listener.roleChanged(oldRole, newRole);
355 }
356 }
357 }
358 }
359
360 /**
361 * Update message indicating
362 * IPs of controllers in controller cluster have changed.
363 */
364 protected class HAControllerNodeIPUpdate implements IUpdate {
365 public Map<String,String> curControllerNodeIPs;
366 public Map<String,String> addedControllerNodeIPs;
367 public Map<String,String> removedControllerNodeIPs;
368 public HAControllerNodeIPUpdate(
369 HashMap<String,String> curControllerNodeIPs,
370 HashMap<String,String> addedControllerNodeIPs,
371 HashMap<String,String> removedControllerNodeIPs) {
372 this.curControllerNodeIPs = curControllerNodeIPs;
373 this.addedControllerNodeIPs = addedControllerNodeIPs;
374 this.removedControllerNodeIPs = removedControllerNodeIPs;
375 }
376 public void dispatch() {
377 if (log.isTraceEnabled()) {
378 log.trace("Dispatching HA Controller Node IP update "
379 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
380 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
381 removedControllerNodeIPs }
382 );
383 }
384 if (haListeners != null) {
385 for (IHAListener listener: haListeners) {
386 listener.controllerNodeIPsChanged(curControllerNodeIPs,
387 addedControllerNodeIPs, removedControllerNodeIPs);
388 }
389 }
390 }
391 }
392
393 // ***************
394 // Getters/Setters
395 // ***************
396
397 public void setStorageSourceService(IStorageSourceService storageSource) {
398 this.storageSource = storageSource;
399 }
400
401 public void setCounterStore(ICounterStoreService counterStore) {
402 this.counterStore = counterStore;
403 }
404
405 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
406 this.pktinProcTime = pits;
407 }
408
409 public void setRestApiService(IRestApiService restApi) {
410 this.restApi = restApi;
411 }
412
413 public void setThreadPoolService(IThreadPoolService tp) {
414 this.threadPool = tp;
415 }
416
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800417 public void setFlowService(IFlowService serviceImpl) {
418 this.flowService = serviceImpl;
419 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800420
421 public void setTopoRouteService(ITopoRouteService serviceImpl) {
422 this.topoRouteService = serviceImpl;
423 }
Jonathan Hartc2e95ee2013-02-22 15:25:11 -0800424
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800425 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800426 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800427 }
428
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800429 @Override
430 public Role getRole() {
431 synchronized(roleChanger) {
432 return role;
433 }
434 }
435
436 @Override
437 public void setRole(Role role) {
438 if (role == null) throw new NullPointerException("Role can not be null.");
439 if (role == Role.MASTER && this.role == Role.SLAVE) {
440 // Reset db state to Inactive for all switches.
441 updateAllInactiveSwitchInfo();
442 }
443
444 // Need to synchronize to ensure a reliable ordering on role request
445 // messages send and to ensure the list of connected switches is stable
446 // RoleChanger will handle the actual sending of the message and
447 // timeout handling
448 // @see RoleChanger
449 synchronized(roleChanger) {
450 if (role.equals(this.role)) {
451 log.debug("Ignoring role change: role is already {}", role);
452 return;
453 }
454
455 Role oldRole = this.role;
456 this.role = role;
457
458 log.debug("Submitting role change request to role {}", role);
459 roleChanger.submitRequest(connectedSwitches, role);
460
461 // Enqueue an update for our listeners.
462 try {
463 this.updates.put(new HARoleUpdate(role, oldRole));
464 } catch (InterruptedException e) {
465 log.error("Failure adding update to queue", e);
466 }
467 }
468 }
469
470
471
472 // **********************
473 // ChannelUpstreamHandler
474 // **********************
475
476 /**
477 * Return a new channel handler for processing a switch connections
478 * @param state The channel state object for the connection
479 * @return the new channel handler
480 */
481 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
482 return new OFChannelHandler(state);
483 }
484
Jonathan Hartcc957a02013-02-26 10:39:04 -0800485 protected class RoleChangeCallback implements ControlChangeCallback {
486 @Override
487 public void controlChanged(long dpid, boolean hasControl) {
488 log.info("Role change callback for switch {}, hasControl {}",
489 HexString.toHexString(dpid), hasControl);
490
491 synchronized(roleChanger){
492 OFSwitchImpl sw = null;
493 for (OFSwitchImpl connectedSw : connectedSwitches){
494 if (connectedSw.getId() == dpid){
495 sw = connectedSw;
496 break;
497 }
498 }
499 if (sw == null){
500 log.warn("Switch {} not found in connected switches",
501 HexString.toHexString(dpid));
502 return;
503 }
504
505 Role role = null;
506
Pankaj Berde01939e92013-03-08 14:38:27 -0800507 /*
508 * issue #229
509 * Cannot rely on sw.getRole() as it can be behind due to pending
510 * role changes in the queue. Just submit it and late the RoleChanger
511 * handle duplicates.
512 */
513
514 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800515 role = Role.MASTER;
516 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800517 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800518 role = Role.SLAVE;
519 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800520
521 log.debug("Sending role request {} msg to {}", role, sw);
522 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
523 swList.add(sw);
524 roleChanger.submitRequest(swList, role);
525
Jonathan Hartcc957a02013-02-26 10:39:04 -0800526 }
527
528 }
529 }
530
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800531 /**
532 * Channel handler deals with the switch connection and dispatches
533 * switch messages to the appropriate locations.
534 * @author readams
535 */
536 protected class OFChannelHandler
537 extends IdleStateAwareChannelUpstreamHandler {
538 protected OFSwitchImpl sw;
539 protected OFChannelState state;
540
541 public OFChannelHandler(OFChannelState state) {
542 this.state = state;
543 }
544
545 @Override
546 @LogMessageDoc(message="New switch connection from {ip address}",
547 explanation="A new switch has connected from the " +
548 "specified IP address")
549 public void channelConnected(ChannelHandlerContext ctx,
550 ChannelStateEvent e) throws Exception {
551 log.info("New switch connection from {}",
552 e.getChannel().getRemoteAddress());
553
554 sw = new OFSwitchImpl();
555 sw.setChannel(e.getChannel());
556 sw.setFloodlightProvider(Controller.this);
557 sw.setThreadPoolService(threadPool);
558
559 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
560 msglist.add(factory.getMessage(OFType.HELLO));
561 e.getChannel().write(msglist);
562
563 }
564
565 @Override
566 @LogMessageDoc(message="Disconnected switch {switch information}",
567 explanation="The specified switch has disconnected.")
568 public void channelDisconnected(ChannelHandlerContext ctx,
569 ChannelStateEvent e) throws Exception {
570 if (sw != null && state.hsState == HandshakeState.READY) {
571 if (activeSwitches.containsKey(sw.getId())) {
572 // It's safe to call removeSwitch even though the map might
573 // not contain this particular switch but another with the
574 // same DPID
575 removeSwitch(sw);
576 }
577 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700578 if (controlRequested) {
579 registryService.releaseControl(sw.getId());
580 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800581 connectedSwitches.remove(sw);
582 }
583 sw.setConnected(false);
584 }
585 log.info("Disconnected switch {}", sw);
586 }
587
588 @Override
589 @LogMessageDocs({
590 @LogMessageDoc(level="ERROR",
591 message="Disconnecting switch {switch} due to read timeout",
592 explanation="The connected switch has failed to send any " +
593 "messages or respond to echo requests",
594 recommendation=LogMessageDoc.CHECK_SWITCH),
595 @LogMessageDoc(level="ERROR",
596 message="Disconnecting switch {switch}: failed to " +
597 "complete handshake",
598 explanation="The switch did not respond correctly " +
599 "to handshake messages",
600 recommendation=LogMessageDoc.CHECK_SWITCH),
601 @LogMessageDoc(level="ERROR",
602 message="Disconnecting switch {switch} due to IO Error: {}",
603 explanation="There was an error communicating with the switch",
604 recommendation=LogMessageDoc.CHECK_SWITCH),
605 @LogMessageDoc(level="ERROR",
606 message="Disconnecting switch {switch} due to switch " +
607 "state error: {error}",
608 explanation="The switch sent an unexpected message",
609 recommendation=LogMessageDoc.CHECK_SWITCH),
610 @LogMessageDoc(level="ERROR",
611 message="Disconnecting switch {switch} due to " +
612 "message parse failure",
613 explanation="Could not parse a message from the switch",
614 recommendation=LogMessageDoc.CHECK_SWITCH),
615 @LogMessageDoc(level="ERROR",
616 message="Terminating controller due to storage exception",
617 explanation=ERROR_DATABASE,
618 recommendation=LogMessageDoc.CHECK_CONTROLLER),
619 @LogMessageDoc(level="ERROR",
620 message="Could not process message: queue full",
621 explanation="OpenFlow messages are arriving faster than " +
622 " the controller can process them.",
623 recommendation=LogMessageDoc.CHECK_CONTROLLER),
624 @LogMessageDoc(level="ERROR",
625 message="Error while processing message " +
626 "from switch {switch} {cause}",
627 explanation="An error occurred processing the switch message",
628 recommendation=LogMessageDoc.GENERIC_ACTION)
629 })
630 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
631 throws Exception {
632 if (e.getCause() instanceof ReadTimeoutException) {
633 // switch timeout
634 log.error("Disconnecting switch {} due to read timeout", sw);
635 ctx.getChannel().close();
636 } else if (e.getCause() instanceof HandshakeTimeoutException) {
637 log.error("Disconnecting switch {}: failed to complete handshake",
638 sw);
639 ctx.getChannel().close();
640 } else if (e.getCause() instanceof ClosedChannelException) {
641 //log.warn("Channel for sw {} already closed", sw);
642 } else if (e.getCause() instanceof IOException) {
643 log.error("Disconnecting switch {} due to IO Error: {}",
644 sw, e.getCause().getMessage());
645 ctx.getChannel().close();
646 } else if (e.getCause() instanceof SwitchStateException) {
647 log.error("Disconnecting switch {} due to switch state error: {}",
648 sw, e.getCause().getMessage());
649 ctx.getChannel().close();
650 } else if (e.getCause() instanceof MessageParseException) {
651 log.error("Disconnecting switch " + sw +
652 " due to message parse failure",
653 e.getCause());
654 ctx.getChannel().close();
655 } else if (e.getCause() instanceof StorageException) {
656 log.error("Terminating controller due to storage exception",
657 e.getCause());
658 terminate();
659 } else if (e.getCause() instanceof RejectedExecutionException) {
660 log.warn("Could not process message: queue full");
661 } else {
662 log.error("Error while processing message from switch " + sw,
663 e.getCause());
664 ctx.getChannel().close();
665 }
666 }
667
668 @Override
669 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
670 throws Exception {
671 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
672 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
673 e.getChannel().write(msglist);
674 }
675
676 @Override
677 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
678 throws Exception {
679 if (e.getMessage() instanceof List) {
680 @SuppressWarnings("unchecked")
681 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
682
683 for (OFMessage ofm : msglist) {
684 try {
685 processOFMessage(ofm);
686 }
687 catch (Exception ex) {
688 // We are the last handler in the stream, so run the
689 // exception through the channel again by passing in
690 // ctx.getChannel().
691 Channels.fireExceptionCaught(ctx.getChannel(), ex);
692 }
693 }
694
695 // Flush all flow-mods/packet-out generated from this "train"
696 OFSwitchImpl.flush_all();
697 }
698 }
699
700 /**
701 * Process the request for the switch description
702 */
703 @LogMessageDoc(level="ERROR",
704 message="Exception in reading description " +
705 " during handshake {exception}",
706 explanation="Could not process the switch description string",
707 recommendation=LogMessageDoc.CHECK_SWITCH)
708 void processSwitchDescReply() {
709 try {
710 // Read description, if it has been updated
711 @SuppressWarnings("unchecked")
712 Future<List<OFStatistics>> desc_future =
713 (Future<List<OFStatistics>>)sw.
714 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
715 List<OFStatistics> values =
716 desc_future.get(0, TimeUnit.MILLISECONDS);
717 if (values != null) {
718 OFDescriptionStatistics description =
719 new OFDescriptionStatistics();
720 ChannelBuffer data =
721 ChannelBuffers.buffer(description.getLength());
722 for (OFStatistics f : values) {
723 f.writeTo(data);
724 description.readFrom(data);
725 break; // SHOULD be a list of length 1
726 }
727 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
728 description);
729 sw.setSwitchProperties(description);
730 data = null;
731
732 // At this time, also set other switch properties from storage
733 boolean is_core_switch = false;
734 IResultSet resultSet = null;
735 try {
736 String swid = sw.getStringId();
737 resultSet =
738 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
739 for (Iterator<IResultSet> it =
740 resultSet.iterator(); it.hasNext();) {
741 // In case of multiple rows, use the status
742 // in last row?
743 Map<String, Object> row = it.next().getRow();
744 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
745 if (log.isDebugEnabled()) {
746 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
747 "config for switch={}, is-core={}",
748 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
749 }
750 String ics =
751 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
752 is_core_switch = ics.equals("true");
753 }
754 }
755 }
756 finally {
757 if (resultSet != null)
758 resultSet.close();
759 }
760 if (is_core_switch) {
761 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
762 new Boolean(true));
763 }
764 }
765 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
766 state.hasDescription = true;
767 checkSwitchReady();
768 }
769 catch (InterruptedException ex) {
770 // Ignore
771 }
772 catch (TimeoutException ex) {
773 // Ignore
774 } catch (Exception ex) {
775 log.error("Exception in reading description " +
776 " during handshake", ex);
777 }
778 }
779
780 /**
781 * Send initial switch setup information that we need before adding
782 * the switch
783 * @throws IOException
784 */
785 void sendHelloConfiguration() throws IOException {
786 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700787 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800788 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
789 }
790
791 /**
792 * Send the configuration requests we can only do after we have
793 * the features reply
794 * @throws IOException
795 */
796 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700797 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800798 // Ensure we receive the full packet via PacketIn
799 OFSetConfig config = (OFSetConfig) factory
800 .getMessage(OFType.SET_CONFIG);
801 config.setMissSendLength((short) 0xffff)
802 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
803 sw.write(config, null);
804 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
805 null);
806
807 // Get Description to set switch-specific flags
808 OFStatisticsRequest req = new OFStatisticsRequest();
809 req.setStatisticType(OFStatisticsType.DESC);
810 req.setLengthU(req.getLengthU());
811 Future<List<OFStatistics>> dfuture =
812 sw.getStatistics(req);
813 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
814 dfuture);
815
816 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700817
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700818 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800819 protected void checkSwitchReady() {
820 if (state.hsState == HandshakeState.FEATURES_REPLY &&
821 state.hasDescription && state.hasGetConfigReply) {
822
823 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700824 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800825
826 synchronized(roleChanger) {
827 // We need to keep track of all of the switches that are connected
828 // to the controller, in any role, so that we can later send the
829 // role request messages when the controller role changes.
830 // We need to be synchronized while doing this: we must not
831 // send a another role request to the connectedSwitches until
832 // we were able to add this new switch to connectedSwitches
833 // *and* send the current role to the new switch.
834 connectedSwitches.add(sw);
835
836 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800837 //Put the switch in SLAVE mode until we know we have control
838 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
839 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
840 swList.add(sw);
841 roleChanger.submitRequest(swList, Role.SLAVE);
842
Jonathan Hartcc957a02013-02-26 10:39:04 -0800843 //Request control of the switch from the global registry
844 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700845 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800846 registryService.requestControl(sw.getId(),
847 new RoleChangeCallback());
848 } catch (RegistryException e) {
849 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700850 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800851 }
852
Jonathan Hart97801ac2013-02-26 14:29:16 -0800853
Jonathan Hartcc957a02013-02-26 10:39:04 -0800854
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800855 // Send a role request if role support is enabled for the controller
856 // This is a probe that we'll use to determine if the switch
857 // actually supports the role request message. If it does we'll
858 // get back a role reply message. If it doesn't we'll get back an
859 // OFError message.
860 // If role is MASTER we will promote switch to active
861 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800862 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800863 log.debug("This controller's role is {}, " +
864 "sending initial role request msg to {}",
865 role, sw);
866 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
867 swList.add(sw);
868 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800869 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800870 }
871 else {
872 // Role supported not enabled on controller (for now)
873 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800874 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800875 "not sending role request msg to {}",
876 role, sw);
877 // Need to clear FlowMods before we add the switch
878 // and dispatch updates otherwise we have a race condition.
879 sw.clearAllFlowMods();
880 addSwitch(sw);
881 state.firstRoleReplyReceived = true;
882 }
883 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700884 if (!controlRequested) {
885 // yield to allow other thread(s) to release control
886 try {
887 Thread.sleep(10);
888 } catch (InterruptedException e) {
889 // Ignore interruptions
890 }
891 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700892 log.debug("Closing {} because we weren't able to request control " +
893 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700894 sw.channel.close();
895 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800896 }
897 }
898
899 /* Handle a role reply message we received from the switch. Since
900 * netty serializes message dispatch we don't need to synchronize
901 * against other receive operations from the same switch, so no need
902 * to synchronize addSwitch(), removeSwitch() operations from the same
903 * connection.
904 * FIXME: However, when a switch with the same DPID connects we do
905 * need some synchronization. However, handling switches with same
906 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
907 * removedSwitch notification):1
908 *
909 */
910 @LogMessageDoc(level="ERROR",
911 message="Invalid role value in role reply message",
912 explanation="Was unable to set the HA role (master or slave) " +
913 "for the controller.",
914 recommendation=LogMessageDoc.CHECK_CONTROLLER)
915 protected void handleRoleReplyMessage(OFVendor vendorMessage,
916 OFRoleReplyVendorData roleReplyVendorData) {
917 // Map from the role code in the message to our role enum
918 int nxRole = roleReplyVendorData.getRole();
919 Role role = null;
920 switch (nxRole) {
921 case OFRoleVendorData.NX_ROLE_OTHER:
922 role = Role.EQUAL;
923 break;
924 case OFRoleVendorData.NX_ROLE_MASTER:
925 role = Role.MASTER;
926 break;
927 case OFRoleVendorData.NX_ROLE_SLAVE:
928 role = Role.SLAVE;
929 break;
930 default:
931 log.error("Invalid role value in role reply message");
932 sw.getChannel().close();
933 return;
934 }
935
936 log.debug("Handling role reply for role {} from {}. " +
937 "Controller's role is {} ",
938 new Object[] { role, sw, Controller.this.role}
939 );
940
941 sw.deliverRoleReply(vendorMessage.getXid(), role);
942
943 boolean isActive = activeSwitches.containsKey(sw.getId());
944 if (!isActive && sw.isActive()) {
945 // Transition from SLAVE to MASTER.
946
947 if (!state.firstRoleReplyReceived ||
948 getAlwaysClearFlowsOnSwAdd()) {
949 // This is the first role-reply message we receive from
950 // this switch or roles were disabled when the switch
951 // connected:
952 // Delete all pre-existing flows for new connections to
953 // the master
954 //
955 // FIXME: Need to think more about what the test should
956 // be for when we flush the flow-table? For example,
957 // if all the controllers are temporarily in the backup
958 // role (e.g. right after a failure of the master
959 // controller) at the point the switch connects, then
960 // all of the controllers will initially connect as
961 // backup controllers and not flush the flow-table.
962 // Then when one of them is promoted to master following
963 // the master controller election the flow-table
964 // will still not be flushed because that's treated as
965 // a failover event where we don't want to flush the
966 // flow-table. The end result would be that the flow
967 // table for a newly connected switch is never
968 // flushed. Not sure how to handle that case though...
969 sw.clearAllFlowMods();
970 log.debug("First role reply from master switch {}, " +
971 "clear FlowTable to active switch list",
972 HexString.toHexString(sw.getId()));
973 }
974
975 // Some switches don't seem to update us with port
976 // status messages while in slave role.
977 readSwitchPortStateFromStorage(sw);
978
979 // Only add the switch to the active switch list if
980 // we're not in the slave role. Note that if the role
981 // attribute is null, then that means that the switch
982 // doesn't support the role request messages, so in that
983 // case we're effectively in the EQUAL role and the
984 // switch should be included in the active switch list.
985 addSwitch(sw);
986 log.debug("Added master switch {} to active switch list",
987 HexString.toHexString(sw.getId()));
988
989 }
990 else if (isActive && !sw.isActive()) {
991 // Transition from MASTER to SLAVE: remove switch
992 // from active switch list.
993 log.debug("Removed slave switch {} from active switch" +
994 " list", HexString.toHexString(sw.getId()));
995 removeSwitch(sw);
996 }
997
998 // Indicate that we have received a role reply message.
999 state.firstRoleReplyReceived = true;
1000 }
1001
1002 protected boolean handleVendorMessage(OFVendor vendorMessage) {
1003 boolean shouldHandleMessage = false;
1004 int vendor = vendorMessage.getVendor();
1005 switch (vendor) {
1006 case OFNiciraVendorData.NX_VENDOR_ID:
1007 OFNiciraVendorData niciraVendorData =
1008 (OFNiciraVendorData)vendorMessage.getVendorData();
1009 int dataType = niciraVendorData.getDataType();
1010 switch (dataType) {
1011 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
1012 OFRoleReplyVendorData roleReplyVendorData =
1013 (OFRoleReplyVendorData) niciraVendorData;
1014 handleRoleReplyMessage(vendorMessage,
1015 roleReplyVendorData);
1016 break;
1017 default:
1018 log.warn("Unhandled Nicira VENDOR message; " +
1019 "data type = {}", dataType);
1020 break;
1021 }
1022 break;
1023 default:
1024 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1025 break;
1026 }
1027
1028 return shouldHandleMessage;
1029 }
1030
1031 /**
1032 * Dispatch an Openflow message from a switch to the appropriate
1033 * handler.
1034 * @param m The message to process
1035 * @throws IOException
1036 * @throws SwitchStateException
1037 */
1038 @LogMessageDocs({
1039 @LogMessageDoc(level="WARN",
1040 message="Config Reply from {switch} has " +
1041 "miss length set to {length}",
1042 explanation="The controller requires that the switch " +
1043 "use a miss length of 0xffff for correct " +
1044 "function",
1045 recommendation="Use a different switch to ensure " +
1046 "correct function"),
1047 @LogMessageDoc(level="WARN",
1048 message="Received ERROR from sw {switch} that "
1049 +"indicates roles are not supported "
1050 +"but we have received a valid "
1051 +"role reply earlier",
1052 explanation="The switch sent a confusing message to the" +
1053 "controller")
1054 })
1055 protected void processOFMessage(OFMessage m)
1056 throws IOException, SwitchStateException {
1057 boolean shouldHandleMessage = false;
1058
1059 switch (m.getType()) {
1060 case HELLO:
1061 if (log.isTraceEnabled())
1062 log.trace("HELLO from {}", sw);
1063
1064 if (state.hsState.equals(HandshakeState.START)) {
1065 state.hsState = HandshakeState.HELLO;
1066 sendHelloConfiguration();
1067 } else {
1068 throw new SwitchStateException("Unexpected HELLO from "
1069 + sw);
1070 }
1071 break;
1072 case ECHO_REQUEST:
1073 OFEchoReply reply =
1074 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1075 reply.setXid(m.getXid());
1076 sw.write(reply, null);
1077 break;
1078 case ECHO_REPLY:
1079 break;
1080 case FEATURES_REPLY:
1081 if (log.isTraceEnabled())
1082 log.trace("Features Reply from {}", sw);
1083
1084 sw.setFeaturesReply((OFFeaturesReply) m);
1085 if (state.hsState.equals(HandshakeState.HELLO)) {
1086 sendFeatureReplyConfiguration();
1087 state.hsState = HandshakeState.FEATURES_REPLY;
1088 // uncomment to enable "dumb" switches like cbench
1089 // state.hsState = HandshakeState.READY;
1090 // addSwitch(sw);
1091 } else {
1092 // return results to rest api caller
1093 sw.deliverOFFeaturesReply(m);
1094 // update database */
1095 updateActiveSwitchInfo(sw);
1096 }
1097 break;
1098 case GET_CONFIG_REPLY:
1099 if (log.isTraceEnabled())
1100 log.trace("Get config reply from {}", sw);
1101
1102 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1103 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1104 throw new SwitchStateException(em);
1105 }
1106 OFGetConfigReply cr = (OFGetConfigReply) m;
1107 if (cr.getMissSendLength() == (short)0xffff) {
1108 log.trace("Config Reply from {} confirms " +
1109 "miss length set to 0xffff", sw);
1110 } else {
1111 log.warn("Config Reply from {} has " +
1112 "miss length set to {}",
1113 sw, cr.getMissSendLength() & 0xffff);
1114 }
1115 state.hasGetConfigReply = true;
1116 checkSwitchReady();
1117 break;
1118 case VENDOR:
1119 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1120 break;
1121 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001122 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001123 // TODO: we need better error handling. Especially for
1124 // request/reply style message (stats, roles) we should have
1125 // a unified way to lookup the xid in the error message.
1126 // This will probable involve rewriting the way we handle
1127 // request/reply style messages.
1128 OFError error = (OFError) m;
1129 boolean shouldLogError = true;
1130 // TODO: should we check that firstRoleReplyReceived is false,
1131 // i.e., check only whether the first request fails?
1132 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1133 boolean isBadVendorError =
1134 (error.getErrorType() == OFError.OFErrorType.
1135 OFPET_BAD_REQUEST.getValue());
1136 // We expect to receive a bad vendor error when
1137 // we're connected to a switch that doesn't support
1138 // the Nicira vendor extensions (i.e. not OVS or
1139 // derived from OVS). By protocol, it should also be
1140 // BAD_VENDOR, but too many switch implementations
1141 // get it wrong and we can already check the xid()
1142 // so we can ignore the type with confidence that this
1143 // is not a spurious error
1144 shouldLogError = !isBadVendorError;
1145 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001146 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001147 if (state.firstRoleReplyReceived && (role != null)) {
1148 log.warn("Received ERROR from sw {} that "
1149 +"indicates roles are not supported "
1150 +"but we have received a valid "
1151 +"role reply earlier", sw);
1152 }
1153 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001154 Role requestedRole =
HIGUCHI Yutaeae374d2013-06-17 10:39:42 -07001155 sw.deliverRoleRequestNotSupportedEx(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001156 synchronized(roleChanger) {
1157 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001158 //This will now never happen. The Controller's role
1159 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001160 // the switch doesn't understand role request
1161 // messages and the current controller role is
1162 // slave. We need to disconnect the switch.
1163 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001164 log.warn("Closing {} channel because controller's role " +
1165 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001166 sw.getChannel().close();
1167 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001168 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001169 log.debug("Adding switch {} because we got an error" +
1170 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001171 // Controller's role is master: add to
1172 // active
1173 // TODO: check if clearing flow table is
1174 // right choice here.
1175 // Need to clear FlowMods before we add the switch
1176 // and dispatch updates otherwise we have a race condition.
1177 // TODO: switch update is async. Won't we still have a potential
1178 // race condition?
1179 sw.clearAllFlowMods();
1180 addSwitch(sw);
1181 }
1182 }
1183 }
1184 else {
1185 // TODO: Is this the right thing to do if we receive
1186 // some other error besides a bad vendor error?
1187 // Presumably that means the switch did actually
1188 // understand the role request message, but there
1189 // was some other error from processing the message.
1190 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1191 // error code, but it doesn't look like the Nicira
1192 // role request has that. Should check OVS source
1193 // code to see if it's possible for any other errors
1194 // to be returned.
1195 // If we received an error the switch is not
1196 // in the correct role, so we need to disconnect it.
1197 // We could also resend the request but then we need to
1198 // check if there are other pending request in which
1199 // case we shouldn't resend. If we do resend we need
1200 // to make sure that the switch eventually accepts one
1201 // of our requests or disconnect the switch. This feels
1202 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001203 log.debug("Closing {} channel because we recieved an " +
1204 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001205 sw.getChannel().close();
1206 }
1207 }
1208 // Once we support OF 1.2, we'd add code to handle it here.
1209 //if (error.getXid() == state.ofRoleRequestXid) {
1210 //}
1211 if (shouldLogError)
1212 logError(sw, error);
1213 break;
1214 case STATS_REPLY:
1215 if (state.hsState.ordinal() <
1216 HandshakeState.FEATURES_REPLY.ordinal()) {
1217 String em = "Unexpected STATS_REPLY from " + sw;
1218 throw new SwitchStateException(em);
1219 }
1220 sw.deliverStatisticsReply(m);
1221 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1222 processSwitchDescReply();
1223 }
1224 break;
1225 case PORT_STATUS:
1226 // We want to update our port state info even if we're in
1227 // the slave role, but we only want to update storage if
1228 // we're the master (or equal).
1229 boolean updateStorage = state.hsState.
1230 equals(HandshakeState.READY) &&
1231 (sw.getRole() != Role.SLAVE);
1232 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1233 shouldHandleMessage = true;
1234 break;
1235
1236 default:
1237 shouldHandleMessage = true;
1238 break;
1239 }
1240
1241 if (shouldHandleMessage) {
1242 sw.getListenerReadLock().lock();
1243 try {
1244 if (sw.isConnected()) {
1245 if (!state.hsState.equals(HandshakeState.READY)) {
1246 log.debug("Ignoring message type {} received " +
1247 "from switch {} before switch is " +
1248 "fully configured.", m.getType(), sw);
1249 }
1250 // Check if the controller is in the slave role for the
1251 // switch. If it is, then don't dispatch the message to
1252 // the listeners.
1253 // TODO: Should we dispatch messages that we expect to
1254 // receive when we're in the slave role, e.g. port
1255 // status messages? Since we're "hiding" switches from
1256 // the listeners when we're in the slave role, then it
1257 // seems a little weird to dispatch port status messages
1258 // to them. On the other hand there might be special
1259 // modules that care about all of the connected switches
1260 // and would like to receive port status notifications.
1261 else if (sw.getRole() == Role.SLAVE) {
1262 // Don't log message if it's a port status message
1263 // since we expect to receive those from the switch
1264 // and don't want to emit spurious messages.
1265 if (m.getType() != OFType.PORT_STATUS) {
1266 log.debug("Ignoring message type {} received " +
1267 "from switch {} while in the slave role.",
1268 m.getType(), sw);
1269 }
1270 } else {
1271 handleMessage(sw, m, null);
1272 }
1273 }
1274 }
1275 finally {
1276 sw.getListenerReadLock().unlock();
1277 }
1278 }
1279 }
1280 }
1281
1282 // ****************
1283 // Message handlers
1284 // ****************
1285
1286 protected void handlePortStatusMessage(IOFSwitch sw,
1287 OFPortStatus m,
1288 boolean updateStorage) {
1289 short portNumber = m.getDesc().getPortNumber();
1290 OFPhysicalPort port = m.getDesc();
1291 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001292 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1293 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001294 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001295 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001296 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1297 try {
1298 this.updates.put(update);
1299 } catch (InterruptedException e) {
1300 log.error("Failure adding update to queue", e);
1301 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001302 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001303 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1304 try {
1305 this.updates.put(update);
1306 } catch (InterruptedException e) {
1307 log.error("Failure adding update to queue", e);
1308 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001309 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001310 if (updateStorage)
1311 updatePortInfo(sw, port);
1312 log.debug("Port #{} modified for {}", portNumber, sw);
1313 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1314 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001315 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1316 try {
1317 this.updates.put(update);
1318 } catch (InterruptedException e) {
1319 log.error("Failure adding update to queue", e);
1320 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001321 if (updateStorage)
1322 updatePortInfo(sw, port);
1323 log.debug("Port #{} added for {}", portNumber, sw);
1324 } else if (m.getReason() ==
1325 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1326 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001327 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1328 try {
1329 this.updates.put(update);
1330 } catch (InterruptedException e) {
1331 log.error("Failure adding update to queue", e);
1332 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001333 if (updateStorage)
1334 removePortInfo(sw, portNumber);
1335 log.debug("Port #{} deleted for {}", portNumber, sw);
1336 }
1337 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1338 try {
1339 this.updates.put(update);
1340 } catch (InterruptedException e) {
1341 log.error("Failure adding update to queue", e);
1342 }
1343 }
1344
1345 /**
1346 * flcontext_cache - Keep a thread local stack of contexts
1347 */
1348 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1349 new ThreadLocal <Stack<FloodlightContext>> () {
1350 @Override
1351 protected Stack<FloodlightContext> initialValue() {
1352 return new Stack<FloodlightContext>();
1353 }
1354 };
1355
1356 /**
1357 * flcontext_alloc - pop a context off the stack, if required create a new one
1358 * @return FloodlightContext
1359 */
1360 protected static FloodlightContext flcontext_alloc() {
1361 FloodlightContext flcontext = null;
1362
1363 if (flcontext_cache.get().empty()) {
1364 flcontext = new FloodlightContext();
1365 }
1366 else {
1367 flcontext = flcontext_cache.get().pop();
1368 }
1369
1370 return flcontext;
1371 }
1372
1373 /**
1374 * flcontext_free - Free the context to the current thread
1375 * @param flcontext
1376 */
1377 protected void flcontext_free(FloodlightContext flcontext) {
1378 flcontext.getStorage().clear();
1379 flcontext_cache.get().push(flcontext);
1380 }
1381
1382 /**
1383 * Handle replies to certain OFMessages, and pass others off to listeners
1384 * @param sw The switch for the message
1385 * @param m The message
1386 * @param bContext The floodlight context. If null then floodlight context would
1387 * be allocated in this function
1388 * @throws IOException
1389 */
1390 @LogMessageDocs({
1391 @LogMessageDoc(level="ERROR",
1392 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1393 " field is empty.",
1394 explanation="The switch sent an improperly-formatted PacketIn" +
1395 " message",
1396 recommendation=LogMessageDoc.CHECK_SWITCH),
1397 @LogMessageDoc(level="WARN",
1398 message="Unhandled OF Message: {} from {}",
1399 explanation="The switch sent a message not handled by " +
1400 "the controller")
1401 })
1402 protected void handleMessage(IOFSwitch sw, OFMessage m,
1403 FloodlightContext bContext)
1404 throws IOException {
1405 Ethernet eth = null;
1406
1407 switch (m.getType()) {
1408 case PACKET_IN:
1409 OFPacketIn pi = (OFPacketIn)m;
1410
1411 if (pi.getPacketData().length <= 0) {
1412 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1413 ") because the data field is empty.");
1414 return;
1415 }
1416
1417 if (Controller.ALWAYS_DECODE_ETH) {
1418 eth = new Ethernet();
1419 eth.deserialize(pi.getPacketData(), 0,
1420 pi.getPacketData().length);
1421 counterStore.updatePacketInCounters(sw, m, eth);
1422 }
1423 // fall through to default case...
1424
1425 default:
1426
1427 List<IOFMessageListener> listeners = null;
1428 if (messageListeners.containsKey(m.getType())) {
1429 listeners = messageListeners.get(m.getType()).
1430 getOrderedListeners();
1431 }
1432
1433 FloodlightContext bc = null;
1434 if (listeners != null) {
1435 // Check if floodlight context is passed from the calling
1436 // function, if so use that floodlight context, otherwise
1437 // allocate one
1438 if (bContext == null) {
1439 bc = flcontext_alloc();
1440 } else {
1441 bc = bContext;
1442 }
1443 if (eth != null) {
1444 IFloodlightProviderService.bcStore.put(bc,
1445 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1446 eth);
1447 }
1448
1449 // Get the starting time (overall and per-component) of
1450 // the processing chain for this packet if performance
1451 // monitoring is turned on
1452 pktinProcTime.bootstrap(listeners);
1453 pktinProcTime.recordStartTimePktIn();
1454 Command cmd;
1455 for (IOFMessageListener listener : listeners) {
1456 if (listener instanceof IOFSwitchFilter) {
1457 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1458 continue;
1459 }
1460 }
1461
1462 pktinProcTime.recordStartTimeComp(listener);
1463 cmd = listener.receive(sw, m, bc);
1464 pktinProcTime.recordEndTimeComp(listener);
1465
1466 if (Command.STOP.equals(cmd)) {
1467 break;
1468 }
1469 }
1470 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1471 } else {
1472 log.warn("Unhandled OF Message: {} from {}", m, sw);
1473 }
1474
1475 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1476 }
1477 }
1478
1479 /**
1480 * Log an OpenFlow error message from a switch
1481 * @param sw The switch that sent the error
1482 * @param error The error message
1483 */
1484 @LogMessageDoc(level="ERROR",
1485 message="Error {error type} {error code} from {switch}",
1486 explanation="The switch responded with an unexpected error" +
1487 "to an OpenFlow message from the controller",
1488 recommendation="This could indicate improper network operation. " +
1489 "If the problem persists restarting the switch and " +
1490 "controller may help."
1491 )
1492 protected void logError(IOFSwitch sw, OFError error) {
1493 int etint = 0xffff & error.getErrorType();
1494 if (etint < 0 || etint >= OFErrorType.values().length) {
1495 log.error("Unknown error code {} from sw {}", etint, sw);
1496 }
1497 OFErrorType et = OFErrorType.values()[etint];
1498 switch (et) {
1499 case OFPET_HELLO_FAILED:
1500 OFHelloFailedCode hfc =
1501 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1502 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1503 break;
1504 case OFPET_BAD_REQUEST:
1505 OFBadRequestCode brc =
1506 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1507 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1508 break;
1509 case OFPET_BAD_ACTION:
1510 OFBadActionCode bac =
1511 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1512 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1513 break;
1514 case OFPET_FLOW_MOD_FAILED:
1515 OFFlowModFailedCode fmfc =
1516 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1517 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1518 break;
1519 case OFPET_PORT_MOD_FAILED:
1520 OFPortModFailedCode pmfc =
1521 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1522 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1523 break;
1524 case OFPET_QUEUE_OP_FAILED:
1525 OFQueueOpFailedCode qofc =
1526 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1527 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1528 break;
1529 default:
1530 break;
1531 }
1532 }
1533
1534 /**
1535 * Add a switch to the active switch list and call the switch listeners.
1536 * This happens either when a switch first connects (and the controller is
1537 * not in the slave role) or when the role of the controller changes from
1538 * slave to master.
1539 * @param sw the switch that has been added
1540 */
1541 // TODO: need to rethink locking and the synchronous switch update.
1542 // We can / should also handle duplicate DPIDs in connectedSwitches
1543 @LogMessageDoc(level="ERROR",
1544 message="New switch added {switch} for already-added switch {switch}",
1545 explanation="A switch with the same DPID as another switch " +
1546 "connected to the controller. This can be caused by " +
1547 "multiple switches configured with the same DPID, or " +
1548 "by a switch reconnected very quickly after " +
1549 "disconnecting.",
1550 recommendation="If this happens repeatedly, it is likely there " +
1551 "are switches with duplicate DPIDs on the network. " +
1552 "Reconfigure the appropriate switches. If it happens " +
1553 "very rarely, then it is likely this is a transient " +
1554 "network problem that can be ignored."
1555 )
1556 protected void addSwitch(IOFSwitch sw) {
1557 // TODO: is it safe to modify the HashMap without holding
1558 // the old switch's lock?
1559 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1560 if (sw == oldSw) {
1561 // Note == for object equality, not .equals for value
1562 log.info("New add switch for pre-existing switch {}", sw);
1563 return;
1564 }
1565
1566 if (oldSw != null) {
1567 oldSw.getListenerWriteLock().lock();
1568 try {
1569 log.error("New switch added {} for already-added switch {}",
1570 sw, oldSw);
1571 // Set the connected flag to false to suppress calling
1572 // the listeners for this switch in processOFMessage
1573 oldSw.setConnected(false);
1574
1575 oldSw.cancelAllStatisticsReplies();
1576
1577 updateInactiveSwitchInfo(oldSw);
1578
1579 // we need to clean out old switch state definitively
1580 // before adding the new switch
1581 // FIXME: It seems not completely kosher to call the
1582 // switch listeners here. I thought one of the points of
1583 // having the asynchronous switch update mechanism was so
1584 // the addedSwitch and removedSwitch were always called
1585 // from a single thread to simplify concurrency issues
1586 // for the listener.
1587 if (switchListeners != null) {
1588 for (IOFSwitchListener listener : switchListeners) {
1589 listener.removedSwitch(oldSw);
1590 }
1591 }
1592 // will eventually trigger a removeSwitch(), which will cause
1593 // a "Not removing Switch ... already removed debug message.
1594 // TODO: Figure out a way to handle this that avoids the
1595 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001596 log.debug("Closing {} because a new IOFSwitch got added " +
1597 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001598 oldSw.getChannel().close();
1599 }
1600 finally {
1601 oldSw.getListenerWriteLock().unlock();
1602 }
1603 }
1604
1605 updateActiveSwitchInfo(sw);
1606 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1607 try {
1608 this.updates.put(update);
1609 } catch (InterruptedException e) {
1610 log.error("Failure adding update to queue", e);
1611 }
1612 }
1613
1614 /**
1615 * Remove a switch from the active switch list and call the switch listeners.
1616 * This happens either when the switch is disconnected or when the
1617 * controller's role for the switch changes from master to slave.
1618 * @param sw the switch that has been removed
1619 */
1620 protected void removeSwitch(IOFSwitch sw) {
1621 // No need to acquire the listener lock, since
1622 // this method is only called after netty has processed all
1623 // pending messages
1624 log.debug("removeSwitch: {}", sw);
1625 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1626 log.debug("Not removing switch {}; already removed", sw);
1627 return;
1628 }
1629 // We cancel all outstanding statistics replies if the switch transition
1630 // from active. In the future we might allow statistics requests
1631 // from slave controllers. Then we need to move this cancelation
1632 // to switch disconnect
1633 sw.cancelAllStatisticsReplies();
1634
1635 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1636 // here if role support is enabled. In that case if the switch is being
1637 // removed because we've been switched to being in the slave role, then I think
1638 // it's possible that the new master may have already been promoted to master
1639 // and written out the active switch state to storage. If we now execute
1640 // updateInactiveSwitchInfo we may wipe out all of the state that was
1641 // written out by the new master. Maybe need to revisit how we handle all
1642 // of the switch state that's written to storage.
1643
1644 updateInactiveSwitchInfo(sw);
1645 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1646 try {
1647 this.updates.put(update);
1648 } catch (InterruptedException e) {
1649 log.error("Failure adding update to queue", e);
1650 }
1651 }
1652
1653 // ***************
1654 // IFloodlightProvider
1655 // ***************
1656
1657 @Override
1658 public synchronized void addOFMessageListener(OFType type,
1659 IOFMessageListener listener) {
1660 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1661 messageListeners.get(type);
1662 if (ldd == null) {
1663 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1664 messageListeners.put(type, ldd);
1665 }
1666 ldd.addListener(type, listener);
1667 }
1668
1669 @Override
1670 public synchronized void removeOFMessageListener(OFType type,
1671 IOFMessageListener listener) {
1672 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1673 messageListeners.get(type);
1674 if (ldd != null) {
1675 ldd.removeListener(listener);
1676 }
1677 }
1678
1679 private void logListeners() {
1680 for (Map.Entry<OFType,
1681 ListenerDispatcher<OFType,
1682 IOFMessageListener>> entry
1683 : messageListeners.entrySet()) {
1684
1685 OFType type = entry.getKey();
1686 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1687 entry.getValue();
1688
1689 StringBuffer sb = new StringBuffer();
1690 sb.append("OFListeners for ");
1691 sb.append(type);
1692 sb.append(": ");
1693 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1694 sb.append(l.getName());
1695 sb.append(",");
1696 }
1697 log.debug(sb.toString());
1698 }
1699 }
1700
1701 public void removeOFMessageListeners(OFType type) {
1702 messageListeners.remove(type);
1703 }
1704
1705 @Override
1706 public Map<Long, IOFSwitch> getSwitches() {
1707 return Collections.unmodifiableMap(this.activeSwitches);
1708 }
1709
1710 @Override
1711 public void addOFSwitchListener(IOFSwitchListener listener) {
1712 this.switchListeners.add(listener);
1713 }
1714
1715 @Override
1716 public void removeOFSwitchListener(IOFSwitchListener listener) {
1717 this.switchListeners.remove(listener);
1718 }
1719
1720 @Override
1721 public Map<OFType, List<IOFMessageListener>> getListeners() {
1722 Map<OFType, List<IOFMessageListener>> lers =
1723 new HashMap<OFType, List<IOFMessageListener>>();
1724 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1725 messageListeners.entrySet()) {
1726 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1727 }
1728 return Collections.unmodifiableMap(lers);
1729 }
1730
1731 @Override
1732 @LogMessageDocs({
1733 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1734 "a null switch",
1735 explanation="Failed to process a message because the switch " +
1736 " is no longer connected."),
1737 @LogMessageDoc(level="ERROR",
1738 message="Error reinjecting OFMessage on switch {switch}",
1739 explanation="An I/O error occured while attempting to " +
1740 "process an OpenFlow message",
1741 recommendation=LogMessageDoc.CHECK_SWITCH)
1742 })
1743 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1744 FloodlightContext bc) {
1745 if (sw == null) {
1746 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1747 return false;
1748 }
1749
1750 // FIXME: Do we need to be able to inject messages to switches
1751 // where we're the slave controller (i.e. they're connected but
1752 // not active)?
1753 // FIXME: Don't we need synchronization logic here so we're holding
1754 // the listener read lock when we call handleMessage? After some
1755 // discussions it sounds like the right thing to do here would be to
1756 // inject the message as a netty upstream channel event so it goes
1757 // through the normal netty event processing, including being
1758 // handled
1759 if (!activeSwitches.containsKey(sw.getId())) return false;
1760
1761 try {
1762 // Pass Floodlight context to the handleMessages()
1763 handleMessage(sw, msg, bc);
1764 } catch (IOException e) {
1765 log.error("Error reinjecting OFMessage on switch {}",
1766 HexString.toHexString(sw.getId()));
1767 return false;
1768 }
1769 return true;
1770 }
1771
1772 @Override
1773 @LogMessageDoc(message="Calling System.exit",
1774 explanation="The controller is terminating")
1775 public synchronized void terminate() {
1776 log.info("Calling System.exit");
1777 System.exit(1);
1778 }
1779
1780 @Override
1781 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1782 // call the overloaded version with floodlight context set to null
1783 return injectOfMessage(sw, msg, null);
1784 }
1785
1786 @Override
1787 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1788 FloodlightContext bc) {
1789 if (log.isTraceEnabled()) {
1790 String str = OFMessage.getDataAsString(sw, m, bc);
1791 log.trace("{}", str);
1792 }
1793
1794 List<IOFMessageListener> listeners = null;
1795 if (messageListeners.containsKey(m.getType())) {
1796 listeners =
1797 messageListeners.get(m.getType()).getOrderedListeners();
1798 }
1799
1800 if (listeners != null) {
1801 for (IOFMessageListener listener : listeners) {
1802 if (listener instanceof IOFSwitchFilter) {
1803 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1804 continue;
1805 }
1806 }
1807 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1808 break;
1809 }
1810 }
1811 }
1812 }
1813
1814 @Override
1815 public BasicFactory getOFMessageFactory() {
1816 return factory;
1817 }
1818
1819 @Override
1820 public String getControllerId() {
1821 return controllerId;
1822 }
1823
1824 // **************
1825 // Initialization
1826 // **************
1827
1828 protected void updateAllInactiveSwitchInfo() {
1829 if (role == Role.SLAVE) {
1830 return;
1831 }
1832 String controllerId = getControllerId();
1833 String[] switchColumns = { SWITCH_DATAPATH_ID,
1834 SWITCH_CONTROLLER_ID,
1835 SWITCH_ACTIVE };
1836 String[] portColumns = { PORT_ID, PORT_SWITCH };
1837 IResultSet switchResultSet = null;
1838 try {
1839 OperatorPredicate op =
1840 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1841 OperatorPredicate.Operator.EQ,
1842 controllerId);
1843 switchResultSet =
1844 storageSource.executeQuery(SWITCH_TABLE_NAME,
1845 switchColumns,
1846 op, null);
1847 while (switchResultSet.next()) {
1848 IResultSet portResultSet = null;
1849 try {
1850 String datapathId =
1851 switchResultSet.getString(SWITCH_DATAPATH_ID);
1852 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1853 op = new OperatorPredicate(PORT_SWITCH,
1854 OperatorPredicate.Operator.EQ,
1855 datapathId);
1856 portResultSet =
1857 storageSource.executeQuery(PORT_TABLE_NAME,
1858 portColumns,
1859 op, null);
1860 while (portResultSet.next()) {
1861 portResultSet.deleteRow();
1862 }
1863 portResultSet.save();
1864 }
1865 finally {
1866 if (portResultSet != null)
1867 portResultSet.close();
1868 }
1869 }
1870 switchResultSet.save();
1871 }
1872 finally {
1873 if (switchResultSet != null)
1874 switchResultSet.close();
1875 }
1876 }
1877
1878 protected void updateControllerInfo() {
1879 updateAllInactiveSwitchInfo();
1880
1881 // Write out the controller info to the storage source
1882 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1883 String id = getControllerId();
1884 controllerInfo.put(CONTROLLER_ID, id);
1885 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1886 }
1887
1888 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1889 if (role == Role.SLAVE) {
1890 return;
1891 }
1892 // Obtain the row info for the switch
1893 Map<String, Object> switchInfo = new HashMap<String, Object>();
1894 String datapathIdString = sw.getStringId();
1895 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1896 String controllerId = getControllerId();
1897 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1898 Date connectedSince = sw.getConnectedSince();
1899 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1900 Channel channel = sw.getChannel();
1901 SocketAddress socketAddress = channel.getRemoteAddress();
1902 if (socketAddress != null) {
1903 String socketAddressString = socketAddress.toString();
1904 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1905 if (socketAddress instanceof InetSocketAddress) {
1906 InetSocketAddress inetSocketAddress =
1907 (InetSocketAddress)socketAddress;
1908 InetAddress inetAddress = inetSocketAddress.getAddress();
1909 String ip = inetAddress.getHostAddress();
1910 switchInfo.put(SWITCH_IP, ip);
1911 }
1912 }
1913
1914 // Write out the switch features info
1915 long capabilities = U32.f(sw.getCapabilities());
1916 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1917 long buffers = U32.f(sw.getBuffers());
1918 switchInfo.put(SWITCH_BUFFERS, buffers);
1919 long tables = U32.f(sw.getTables());
1920 switchInfo.put(SWITCH_TABLES, tables);
1921 long actions = U32.f(sw.getActions());
1922 switchInfo.put(SWITCH_ACTIONS, actions);
1923 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1924
1925 // Update the switch
1926 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1927
1928 // Update the ports
1929 for (OFPhysicalPort port: sw.getPorts()) {
1930 updatePortInfo(sw, port);
1931 }
1932 }
1933
1934 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1935 if (role == Role.SLAVE) {
1936 return;
1937 }
1938 log.debug("Update DB with inactiveSW {}", sw);
1939 // Update the controller info in the storage source to be inactive
1940 Map<String, Object> switchInfo = new HashMap<String, Object>();
1941 String datapathIdString = sw.getStringId();
1942 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1943 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1944 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1945 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1946 }
1947
1948 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1949 if (role == Role.SLAVE) {
1950 return;
1951 }
1952 String datapathIdString = sw.getStringId();
1953 Map<String, Object> portInfo = new HashMap<String, Object>();
1954 int portNumber = U16.f(port.getPortNumber());
1955 String id = datapathIdString + "|" + portNumber;
1956 portInfo.put(PORT_ID, id);
1957 portInfo.put(PORT_SWITCH, datapathIdString);
1958 portInfo.put(PORT_NUMBER, portNumber);
1959 byte[] hardwareAddress = port.getHardwareAddress();
1960 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1961 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1962 String name = port.getName();
1963 portInfo.put(PORT_NAME, name);
1964 long config = U32.f(port.getConfig());
1965 portInfo.put(PORT_CONFIG, config);
1966 long state = U32.f(port.getState());
1967 portInfo.put(PORT_STATE, state);
1968 long currentFeatures = U32.f(port.getCurrentFeatures());
1969 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1970 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1971 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1972 long supportedFeatures = U32.f(port.getSupportedFeatures());
1973 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1974 long peerFeatures = U32.f(port.getPeerFeatures());
1975 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1976 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1977 }
1978
1979 /**
1980 * Read switch port data from storage and write it into a switch object
1981 * @param sw the switch to update
1982 */
1983 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1984 OperatorPredicate op =
1985 new OperatorPredicate(PORT_SWITCH,
1986 OperatorPredicate.Operator.EQ,
1987 sw.getStringId());
1988 IResultSet portResultSet =
1989 storageSource.executeQuery(PORT_TABLE_NAME,
1990 null, op, null);
1991 //Map<Short, OFPhysicalPort> oldports =
1992 // new HashMap<Short, OFPhysicalPort>();
1993 //oldports.putAll(sw.getPorts());
1994
1995 while (portResultSet.next()) {
1996 try {
1997 OFPhysicalPort p = new OFPhysicalPort();
1998 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1999 p.setName(portResultSet.getString(PORT_NAME));
2000 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
2001 p.setState((int)portResultSet.getLong(PORT_STATE));
2002 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
2003 p.setHardwareAddress(HexString.fromHexString(portMac));
2004 p.setCurrentFeatures((int)portResultSet.
2005 getLong(PORT_CURRENT_FEATURES));
2006 p.setAdvertisedFeatures((int)portResultSet.
2007 getLong(PORT_ADVERTISED_FEATURES));
2008 p.setSupportedFeatures((int)portResultSet.
2009 getLong(PORT_SUPPORTED_FEATURES));
2010 p.setPeerFeatures((int)portResultSet.
2011 getLong(PORT_PEER_FEATURES));
2012 //oldports.remove(Short.valueOf(p.getPortNumber()));
2013 sw.setPort(p);
2014 } catch (NullPointerException e) {
2015 // ignore
2016 }
2017 }
2018 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2019 try {
2020 this.updates.put(update);
2021 } catch (InterruptedException e) {
2022 log.error("Failure adding update to queue", e);
2023 }
2024 }
2025
2026 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2027 if (role == Role.SLAVE) {
2028 return;
2029 }
2030 String datapathIdString = sw.getStringId();
2031 String id = datapathIdString + "|" + portNumber;
2032 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2033 }
2034
2035 /**
2036 * Sets the initial role based on properties in the config params.
2037 * It looks for two different properties.
2038 * If the "role" property is specified then the value should be
2039 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2040 * controller is set to the specified value. If the "role" property
2041 * is not specified then it looks next for the "role.path" property.
2042 * In this case the value should be the path to a property file in
2043 * the file system that contains a property called "floodlight.role"
2044 * which can be one of the values listed above for the "role" property.
2045 * The idea behind the "role.path" mechanism is that you have some
2046 * separate heartbeat and master controller election algorithm that
2047 * determines the role of the controller. When a role transition happens,
2048 * it updates the current role in the file specified by the "role.path"
2049 * file. Then if floodlight restarts for some reason it can get the
2050 * correct current role of the controller from the file.
2051 * @param configParams The config params for the FloodlightProvider service
2052 * @return A valid role if role information is specified in the
2053 * config params, otherwise null
2054 */
2055 @LogMessageDocs({
2056 @LogMessageDoc(message="Controller role set to {role}",
2057 explanation="Setting the initial HA role to "),
2058 @LogMessageDoc(level="ERROR",
2059 message="Invalid current role value: {role}",
2060 explanation="An invalid HA role value was read from the " +
2061 "properties file",
2062 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2063 })
2064 protected Role getInitialRole(Map<String, String> configParams) {
2065 Role role = null;
2066 String roleString = configParams.get("role");
2067 if (roleString == null) {
2068 String rolePath = configParams.get("rolepath");
2069 if (rolePath != null) {
2070 Properties properties = new Properties();
2071 try {
2072 properties.load(new FileInputStream(rolePath));
2073 roleString = properties.getProperty("floodlight.role");
2074 }
2075 catch (IOException exc) {
2076 // Don't treat it as an error if the file specified by the
2077 // rolepath property doesn't exist. This lets us enable the
2078 // HA mechanism by just creating/setting the floodlight.role
2079 // property in that file without having to modify the
2080 // floodlight properties.
2081 }
2082 }
2083 }
2084
2085 if (roleString != null) {
2086 // Canonicalize the string to the form used for the enum constants
2087 roleString = roleString.trim().toUpperCase();
2088 try {
2089 role = Role.valueOf(roleString);
2090 }
2091 catch (IllegalArgumentException exc) {
2092 log.error("Invalid current role value: {}", roleString);
2093 }
2094 }
2095
2096 log.info("Controller role set to {}", role);
2097
2098 return role;
2099 }
2100
2101 /**
2102 * Tell controller that we're ready to accept switches loop
2103 * @throws IOException
2104 */
2105 @LogMessageDocs({
2106 @LogMessageDoc(message="Listening for switch connections on {address}",
2107 explanation="The controller is ready and listening for new" +
2108 " switch connections"),
2109 @LogMessageDoc(message="Storage exception in controller " +
2110 "updates loop; terminating process",
2111 explanation=ERROR_DATABASE,
2112 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2113 @LogMessageDoc(level="ERROR",
2114 message="Exception in controller updates loop",
2115 explanation="Failed to dispatch controller event",
2116 recommendation=LogMessageDoc.GENERIC_ACTION)
2117 })
2118 public void run() {
2119 if (log.isDebugEnabled()) {
2120 logListeners();
2121 }
2122
2123 try {
2124 final ServerBootstrap bootstrap = createServerBootStrap();
2125
2126 bootstrap.setOption("reuseAddr", true);
2127 bootstrap.setOption("child.keepAlive", true);
2128 bootstrap.setOption("child.tcpNoDelay", true);
2129 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2130
2131 ChannelPipelineFactory pfact =
2132 new OpenflowPipelineFactory(this, null);
2133 bootstrap.setPipelineFactory(pfact);
2134 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2135 final ChannelGroup cg = new DefaultChannelGroup();
2136 cg.add(bootstrap.bind(sa));
2137
2138 log.info("Listening for switch connections on {}", sa);
2139 } catch (Exception e) {
2140 throw new RuntimeException(e);
2141 }
2142
2143 // main loop
2144 while (true) {
2145 try {
2146 IUpdate update = updates.take();
2147 update.dispatch();
2148 } catch (InterruptedException e) {
2149 return;
2150 } catch (StorageException e) {
2151 log.error("Storage exception in controller " +
2152 "updates loop; terminating process", e);
2153 return;
2154 } catch (Exception e) {
2155 log.error("Exception in controller updates loop", e);
2156 }
2157 }
2158 }
2159
2160 private ServerBootstrap createServerBootStrap() {
2161 if (workerThreads == 0) {
2162 return new ServerBootstrap(
2163 new NioServerSocketChannelFactory(
2164 Executors.newCachedThreadPool(),
2165 Executors.newCachedThreadPool()));
2166 } else {
2167 return new ServerBootstrap(
2168 new NioServerSocketChannelFactory(
2169 Executors.newCachedThreadPool(),
2170 Executors.newCachedThreadPool(), workerThreads));
2171 }
2172 }
2173
2174 public void setConfigParams(Map<String, String> configParams) {
2175 String ofPort = configParams.get("openflowport");
2176 if (ofPort != null) {
2177 this.openFlowPort = Integer.parseInt(ofPort);
2178 }
2179 log.debug("OpenFlow port set to {}", this.openFlowPort);
2180 String threads = configParams.get("workerthreads");
2181 if (threads != null) {
2182 this.workerThreads = Integer.parseInt(threads);
2183 }
2184 log.debug("Number of worker threads set to {}", this.workerThreads);
2185 String controllerId = configParams.get("controllerid");
2186 if (controllerId != null) {
2187 this.controllerId = controllerId;
2188 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002189 else {
2190 //Try to get the hostname of the machine and use that for controller ID
2191 try {
2192 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2193 this.controllerId = hostname;
2194 } catch (UnknownHostException e) {
2195 // Can't get hostname, we'll just use the default
2196 }
2197 }
2198
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002199 log.debug("ControllerId set to {}", this.controllerId);
2200 }
2201
2202 private void initVendorMessages() {
2203 // Configure openflowj to be able to parse the role request/reply
2204 // vendor messages.
2205 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2206 OFNiciraVendorData.NX_VENDOR_ID, 4);
2207 OFVendorId.registerVendorId(niciraVendorId);
2208 OFBasicVendorDataType roleRequestVendorData =
2209 new OFBasicVendorDataType(
2210 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2211 OFRoleRequestVendorData.getInstantiable());
2212 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2213 OFBasicVendorDataType roleReplyVendorData =
2214 new OFBasicVendorDataType(
2215 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2216 OFRoleReplyVendorData.getInstantiable());
2217 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2218 }
2219
2220 /**
2221 * Initialize internal data structures
2222 */
2223 public void init(Map<String, String> configParams) {
2224 // These data structures are initialized here because other
2225 // module's startUp() might be called before ours
2226 this.messageListeners =
2227 new ConcurrentHashMap<OFType,
2228 ListenerDispatcher<OFType,
2229 IOFMessageListener>>();
2230 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2231 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2232 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2233 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2234 this.controllerNodeIPsCache = new HashMap<String, String>();
2235 this.updates = new LinkedBlockingQueue<IUpdate>();
2236 this.factory = new BasicFactory();
2237 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2238 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002239 //this.role = getInitialRole(configParams);
2240 //Set the controller's role to MASTER so it always tries to do role requests.
2241 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002242 this.roleChanger = new RoleChanger();
2243 initVendorMessages();
2244 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002245 }
2246
2247 /**
2248 * Startup all of the controller's components
2249 */
2250 @LogMessageDoc(message="Waiting for storage source",
2251 explanation="The system database is not yet ready",
2252 recommendation="If this message persists, this indicates " +
2253 "that the system database has failed to start. " +
2254 LogMessageDoc.CHECK_CONTROLLER)
2255 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002256 try {
2257 registryService.registerController(controllerId);
2258 } catch (RegistryException e2) {
2259 log.warn("Registry service error: {}", e2.getMessage());
2260 }
2261
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002262 // Create the table names we use
2263 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2264 storageSource.createTable(SWITCH_TABLE_NAME, null);
2265 storageSource.createTable(PORT_TABLE_NAME, null);
2266 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2267 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2268 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2269 CONTROLLER_ID);
2270 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2271 SWITCH_DATAPATH_ID);
2272 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2273 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2274 CONTROLLER_INTERFACE_ID);
2275 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2276
2277 while (true) {
2278 try {
2279 updateControllerInfo();
2280 break;
2281 }
2282 catch (StorageException e) {
2283 log.info("Waiting for storage source");
2284 try {
2285 Thread.sleep(1000);
2286 } catch (InterruptedException e1) {
2287 }
2288 }
2289 }
2290
2291 // Add our REST API
2292 restApi.addRestletRoutable(new CoreWebRoutable());
2293 }
2294
2295 @Override
2296 public void addInfoProvider(String type, IInfoProvider provider) {
2297 if (!providerMap.containsKey(type)) {
2298 providerMap.put(type, new ArrayList<IInfoProvider>());
2299 }
2300 providerMap.get(type).add(provider);
2301 }
2302
2303 @Override
2304 public void removeInfoProvider(String type, IInfoProvider provider) {
2305 if (!providerMap.containsKey(type)) {
2306 log.debug("Provider type {} doesn't exist.", type);
2307 return;
2308 }
2309
2310 providerMap.get(type).remove(provider);
2311 }
2312
2313 public Map<String, Object> getControllerInfo(String type) {
2314 if (!providerMap.containsKey(type)) return null;
2315
2316 Map<String, Object> result = new LinkedHashMap<String, Object>();
2317 for (IInfoProvider provider : providerMap.get(type)) {
2318 result.putAll(provider.getInfo(type));
2319 }
2320
2321 return result;
2322 }
2323
2324 @Override
2325 public void addHAListener(IHAListener listener) {
2326 this.haListeners.add(listener);
2327 }
2328
2329 @Override
2330 public void removeHAListener(IHAListener listener) {
2331 this.haListeners.remove(listener);
2332 }
2333
2334
2335 /**
2336 * Handle changes to the controller nodes IPs and dispatch update.
2337 */
2338 @SuppressWarnings("unchecked")
2339 protected void handleControllerNodeIPChanges() {
2340 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2341 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2342 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2343 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2344 CONTROLLER_INTERFACE_TYPE,
2345 CONTROLLER_INTERFACE_NUMBER,
2346 CONTROLLER_INTERFACE_DISCOVERED_IP };
2347 synchronized(controllerNodeIPsCache) {
2348 // We currently assume that interface Ethernet0 is the relevant
2349 // controller interface. Might change.
2350 // We could (should?) implement this using
2351 // predicates, but creating the individual and compound predicate
2352 // seems more overhead then just checking every row. Particularly,
2353 // since the number of rows is small and changes infrequent
2354 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2355 colNames,null, null);
2356 while (res.next()) {
2357 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2358 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2359 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2360 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2361 String curIP = controllerNodeIPsCache.get(controllerID);
2362
2363 curControllerNodeIPs.put(controllerID, discoveredIP);
2364 if (curIP == null) {
2365 // new controller node IP
2366 addedControllerNodeIPs.put(controllerID, discoveredIP);
2367 }
2368 else if (curIP != discoveredIP) {
2369 // IP changed
2370 removedControllerNodeIPs.put(controllerID, curIP);
2371 addedControllerNodeIPs.put(controllerID, discoveredIP);
2372 }
2373 }
2374 }
2375 // Now figure out if rows have been deleted. We can't use the
2376 // rowKeys from rowsDeleted directly, since the tables primary
2377 // key is a compound that we can't disassemble
2378 Set<String> curEntries = curControllerNodeIPs.keySet();
2379 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2380 removedEntries.removeAll(curEntries);
2381 for (String removedControllerID : removedEntries)
2382 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2383 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2384 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2385 curControllerNodeIPs, addedControllerNodeIPs,
2386 removedControllerNodeIPs);
2387 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2388 try {
2389 this.updates.put(update);
2390 } catch (InterruptedException e) {
2391 log.error("Failure adding update to queue", e);
2392 }
2393 }
2394 }
2395 }
2396
2397 @Override
2398 public Map<String, String> getControllerNodeIPs() {
2399 // We return a copy of the mapping so we can guarantee that
2400 // the mapping return is the same as one that will be (or was)
2401 // dispatched to IHAListeners
2402 HashMap<String,String> retval = new HashMap<String,String>();
2403 synchronized(controllerNodeIPsCache) {
2404 retval.putAll(controllerNodeIPsCache);
2405 }
2406 return retval;
2407 }
2408
2409 @Override
2410 public void rowsModified(String tableName, Set<Object> rowKeys) {
2411 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2412 handleControllerNodeIPChanges();
2413 }
2414
2415 }
2416
2417 @Override
2418 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2419 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2420 handleControllerNodeIPChanges();
2421 }
2422 }
2423
2424 @Override
2425 public long getSystemStartTime() {
2426 return (this.systemStartTime);
2427 }
2428
2429 @Override
2430 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2431 this.alwaysClearFlowsOnSwAdd = value;
2432 }
2433
2434 public boolean getAlwaysClearFlowsOnSwAdd() {
2435 return this.alwaysClearFlowsOnSwAdd;
2436 }
2437}