blob: 21eceb31c899ffeeedcec8351fc69dd1a8509859 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berdedc73bb12013-08-14 13:46:38 -070061import net.floodlightcontroller.core.IUpdate;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
68import net.floodlightcontroller.packet.Ethernet;
69import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
70import net.floodlightcontroller.restserver.IRestApiService;
71import net.floodlightcontroller.storage.IResultSet;
72import net.floodlightcontroller.storage.IStorageSourceListener;
73import net.floodlightcontroller.storage.IStorageSourceService;
74import net.floodlightcontroller.storage.OperatorPredicate;
75import net.floodlightcontroller.storage.StorageException;
76import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070077import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
HIGUCHI Yuta60a10142013-06-14 15:50:10 -070078import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080079import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080080import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080081import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080082
83import org.jboss.netty.bootstrap.ServerBootstrap;
84import org.jboss.netty.buffer.ChannelBuffer;
85import org.jboss.netty.buffer.ChannelBuffers;
86import org.jboss.netty.channel.Channel;
87import org.jboss.netty.channel.ChannelHandlerContext;
88import org.jboss.netty.channel.ChannelPipelineFactory;
89import org.jboss.netty.channel.ChannelStateEvent;
90import org.jboss.netty.channel.ChannelUpstreamHandler;
91import org.jboss.netty.channel.Channels;
92import org.jboss.netty.channel.ExceptionEvent;
93import org.jboss.netty.channel.MessageEvent;
94import org.jboss.netty.channel.group.ChannelGroup;
95import org.jboss.netty.channel.group.DefaultChannelGroup;
96import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
97import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
98import org.jboss.netty.handler.timeout.IdleStateEvent;
99import org.jboss.netty.handler.timeout.ReadTimeoutException;
100import org.openflow.protocol.OFEchoReply;
101import org.openflow.protocol.OFError;
102import org.openflow.protocol.OFError.OFBadActionCode;
103import org.openflow.protocol.OFError.OFBadRequestCode;
104import org.openflow.protocol.OFError.OFErrorType;
105import org.openflow.protocol.OFError.OFFlowModFailedCode;
106import org.openflow.protocol.OFError.OFHelloFailedCode;
107import org.openflow.protocol.OFError.OFPortModFailedCode;
108import org.openflow.protocol.OFError.OFQueueOpFailedCode;
109import org.openflow.protocol.OFFeaturesReply;
110import org.openflow.protocol.OFGetConfigReply;
111import org.openflow.protocol.OFMessage;
112import org.openflow.protocol.OFPacketIn;
113import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800115import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800116import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800117import org.openflow.protocol.OFPortStatus.OFPortReason;
118import org.openflow.protocol.OFSetConfig;
119import org.openflow.protocol.OFStatisticsRequest;
120import org.openflow.protocol.OFSwitchConfig;
121import org.openflow.protocol.OFType;
122import org.openflow.protocol.OFVendor;
123import org.openflow.protocol.factory.BasicFactory;
124import org.openflow.protocol.factory.MessageParseException;
125import org.openflow.protocol.statistics.OFDescriptionStatistics;
126import org.openflow.protocol.statistics.OFStatistics;
127import org.openflow.protocol.statistics.OFStatisticsType;
128import org.openflow.protocol.vendor.OFBasicVendorDataType;
129import org.openflow.protocol.vendor.OFBasicVendorId;
130import org.openflow.protocol.vendor.OFVendorId;
131import org.openflow.util.HexString;
132import org.openflow.util.U16;
133import org.openflow.util.U32;
134import org.openflow.vendor.nicira.OFNiciraVendorData;
135import org.openflow.vendor.nicira.OFRoleReplyVendorData;
136import org.openflow.vendor.nicira.OFRoleRequestVendorData;
137import org.openflow.vendor.nicira.OFRoleVendorData;
138import org.slf4j.Logger;
139import org.slf4j.LoggerFactory;
140
141
142/**
143 * The main controller class. Handles all setup and network listeners
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700144 *
145 * Extensions made by ONOS are:
146 * - Detailed Port event: PORTCHANGED -> {PORTCHANGED, PORTADDED, PORTREMOVED}
147 * Available as net.onrc.onos.ofcontroller.core.IOFSwitchPortListener
148 * - Distributed ownership control of switch through RegistryService(IControllerRegistryService)
Pavlin Radoslavovddd01ba2013-07-03 15:40:44 -0700149 * - Register ONOS services. (IFlowService, IControllerRegistryService)
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700150 * - Additional DEBUG logs
151 * - Try using hostname as controller ID, when ID was not explicitly given.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800152 */
153public class Controller implements IFloodlightProviderService,
154 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700155
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800156 protected static Logger log = LoggerFactory.getLogger(Controller.class);
157
158 private static final String ERROR_DATABASE =
159 "The controller could not communicate with the system database.";
160
161 protected BasicFactory factory;
162 protected ConcurrentMap<OFType,
163 ListenerDispatcher<OFType,IOFMessageListener>>
164 messageListeners;
165 // The activeSwitches map contains only those switches that are actively
166 // being controlled by us -- it doesn't contain switches that are
167 // in the slave role
168 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
169 // connectedSwitches contains all connected switches, including ones where
170 // we're a slave controller. We need to keep track of them so that we can
171 // send role request messages to switches when our role changes to master
172 // We add a switch to this set after it successfully completes the
173 // handshake. Access to this Set needs to be synchronized with roleChanger
174 protected HashSet<OFSwitchImpl> connectedSwitches;
175
176 // The controllerNodeIPsCache maps Controller IDs to their IP address.
177 // It's only used by handleControllerNodeIPsChanged
178 protected HashMap<String, String> controllerNodeIPsCache;
179
180 protected Set<IOFSwitchListener> switchListeners;
181 protected Set<IHAListener> haListeners;
182 protected Map<String, List<IInfoProvider>> providerMap;
183 protected BlockingQueue<IUpdate> updates;
184
185 // Module dependencies
186 protected IRestApiService restApi;
187 protected ICounterStoreService counterStore = null;
188 protected IStorageSourceService storageSource;
189 protected IPktInProcessingTimeService pktinProcTime;
190 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800191 protected IFlowService flowService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800192 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800193
194 // Configuration options
195 protected int openFlowPort = 6633;
196 protected int workerThreads = 0;
197 // The id for this controller node. Should be unique for each controller
198 // node in a controller cluster.
199 protected String controllerId = "localhost";
200
201 // The current role of the controller.
202 // If the controller isn't configured to support roles, then this is null.
203 protected Role role;
204 // A helper that handles sending and timeout handling for role requests
205 protected RoleChanger roleChanger;
206
207 // Start time of the controller
208 protected long systemStartTime;
209
210 // Flag to always flush flow table on switch reconnect (HA or otherwise)
211 protected boolean alwaysClearFlowsOnSwAdd = false;
212
213 // Storage table names
214 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
215 protected static final String CONTROLLER_ID = "id";
216
217 protected static final String SWITCH_TABLE_NAME = "controller_switch";
218 protected static final String SWITCH_DATAPATH_ID = "dpid";
219 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
220 protected static final String SWITCH_IP = "ip";
221 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
222 protected static final String SWITCH_ACTIVE = "active";
223 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
224 protected static final String SWITCH_CAPABILITIES = "capabilities";
225 protected static final String SWITCH_BUFFERS = "buffers";
226 protected static final String SWITCH_TABLES = "tables";
227 protected static final String SWITCH_ACTIONS = "actions";
228
229 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
230 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
231
232 protected static final String PORT_TABLE_NAME = "controller_port";
233 protected static final String PORT_ID = "id";
234 protected static final String PORT_SWITCH = "switch_id";
235 protected static final String PORT_NUMBER = "number";
236 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
237 protected static final String PORT_NAME = "name";
238 protected static final String PORT_CONFIG = "config";
239 protected static final String PORT_STATE = "state";
240 protected static final String PORT_CURRENT_FEATURES = "current_features";
241 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
242 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
243 protected static final String PORT_PEER_FEATURES = "peer_features";
244
245 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
246 protected static final String CONTROLLER_INTERFACE_ID = "id";
247 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
248 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
249 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
250 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
251
252
253
254 // Perf. related configuration
255 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
256 protected static final int BATCH_MAX_SIZE = 100;
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700257 protected static final boolean ALWAYS_DECODE_ETH = true;
258
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800259 public enum SwitchUpdateType {
260 ADDED,
261 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700262 PORTCHANGED,
263 PORTADDED,
264 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800265 }
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700266
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800267 /**
268 * Update message indicating a switch was added or removed
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700269 * ONOS: This message extended to indicate Port add or removed event.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800270 */
271 protected class SwitchUpdate implements IUpdate {
272 public IOFSwitch sw;
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700273 public OFPhysicalPort port; // Added by ONOS
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800274 public SwitchUpdateType switchUpdateType;
275 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
276 this.sw = sw;
277 this.switchUpdateType = switchUpdateType;
278 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700279 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
280 this.sw = sw;
281 this.port = port;
282 this.switchUpdateType = switchUpdateType;
283 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800284 public void dispatch() {
285 if (log.isTraceEnabled()) {
286 log.trace("Dispatching switch update {} {}",
287 sw, switchUpdateType);
288 }
289 if (switchListeners != null) {
290 for (IOFSwitchListener listener : switchListeners) {
291 switch(switchUpdateType) {
292 case ADDED:
293 listener.addedSwitch(sw);
294 break;
295 case REMOVED:
296 listener.removedSwitch(sw);
297 break;
298 case PORTCHANGED:
299 listener.switchPortChanged(sw.getId());
300 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700301 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700302 if (listener instanceof IOFSwitchPortListener) {
303 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
304 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700305 break;
306 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700307 if (listener instanceof IOFSwitchPortListener) {
308 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
309 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700310 break;
311 default:
312 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800313 }
314 }
315 }
316 }
317 }
318
319 /**
320 * Update message indicating controller's role has changed
321 */
322 protected class HARoleUpdate implements IUpdate {
323 public Role oldRole;
324 public Role newRole;
325 public HARoleUpdate(Role newRole, Role oldRole) {
326 this.oldRole = oldRole;
327 this.newRole = newRole;
328 }
329 public void dispatch() {
330 // Make sure that old and new roles are different.
331 if (oldRole == newRole) {
332 if (log.isTraceEnabled()) {
333 log.trace("HA role update ignored as the old and " +
334 "new roles are the same. newRole = {}" +
335 "oldRole = {}", newRole, oldRole);
336 }
337 return;
338 }
339 if (log.isTraceEnabled()) {
340 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
341 newRole, oldRole);
342 }
343 if (haListeners != null) {
344 for (IHAListener listener : haListeners) {
345 listener.roleChanged(oldRole, newRole);
346 }
347 }
348 }
349 }
350
351 /**
352 * Update message indicating
353 * IPs of controllers in controller cluster have changed.
354 */
355 protected class HAControllerNodeIPUpdate implements IUpdate {
356 public Map<String,String> curControllerNodeIPs;
357 public Map<String,String> addedControllerNodeIPs;
358 public Map<String,String> removedControllerNodeIPs;
359 public HAControllerNodeIPUpdate(
360 HashMap<String,String> curControllerNodeIPs,
361 HashMap<String,String> addedControllerNodeIPs,
362 HashMap<String,String> removedControllerNodeIPs) {
363 this.curControllerNodeIPs = curControllerNodeIPs;
364 this.addedControllerNodeIPs = addedControllerNodeIPs;
365 this.removedControllerNodeIPs = removedControllerNodeIPs;
366 }
367 public void dispatch() {
368 if (log.isTraceEnabled()) {
369 log.trace("Dispatching HA Controller Node IP update "
370 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
371 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
372 removedControllerNodeIPs }
373 );
374 }
375 if (haListeners != null) {
376 for (IHAListener listener: haListeners) {
377 listener.controllerNodeIPsChanged(curControllerNodeIPs,
378 addedControllerNodeIPs, removedControllerNodeIPs);
379 }
380 }
381 }
382 }
383
384 // ***************
385 // Getters/Setters
386 // ***************
387
388 public void setStorageSourceService(IStorageSourceService storageSource) {
389 this.storageSource = storageSource;
390 }
391
392 public void setCounterStore(ICounterStoreService counterStore) {
393 this.counterStore = counterStore;
394 }
395
396 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
397 this.pktinProcTime = pits;
398 }
399
400 public void setRestApiService(IRestApiService restApi) {
401 this.restApi = restApi;
402 }
403
404 public void setThreadPoolService(IThreadPoolService tp) {
405 this.threadPool = tp;
406 }
407
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800408 public void setFlowService(IFlowService serviceImpl) {
409 this.flowService = serviceImpl;
410 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800411
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800412 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800413 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800414 }
415
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800416 @Override
417 public Role getRole() {
418 synchronized(roleChanger) {
419 return role;
420 }
421 }
422
423 @Override
424 public void setRole(Role role) {
425 if (role == null) throw new NullPointerException("Role can not be null.");
426 if (role == Role.MASTER && this.role == Role.SLAVE) {
427 // Reset db state to Inactive for all switches.
428 updateAllInactiveSwitchInfo();
429 }
430
431 // Need to synchronize to ensure a reliable ordering on role request
432 // messages send and to ensure the list of connected switches is stable
433 // RoleChanger will handle the actual sending of the message and
434 // timeout handling
435 // @see RoleChanger
436 synchronized(roleChanger) {
437 if (role.equals(this.role)) {
438 log.debug("Ignoring role change: role is already {}", role);
439 return;
440 }
441
442 Role oldRole = this.role;
443 this.role = role;
444
445 log.debug("Submitting role change request to role {}", role);
446 roleChanger.submitRequest(connectedSwitches, role);
447
448 // Enqueue an update for our listeners.
449 try {
450 this.updates.put(new HARoleUpdate(role, oldRole));
451 } catch (InterruptedException e) {
452 log.error("Failure adding update to queue", e);
453 }
454 }
455 }
456
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700457 public void publishUpdate(IUpdate update) {
458 try {
459 this.updates.put(update);
460 } catch (InterruptedException e) {
461 log.error("Failure adding update to queue", e);
462 }
463 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800464
465 // **********************
466 // ChannelUpstreamHandler
467 // **********************
468
469 /**
470 * Return a new channel handler for processing a switch connections
471 * @param state The channel state object for the connection
472 * @return the new channel handler
473 */
474 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
475 return new OFChannelHandler(state);
476 }
477
Jonathan Hartcc957a02013-02-26 10:39:04 -0800478 protected class RoleChangeCallback implements ControlChangeCallback {
479 @Override
480 public void controlChanged(long dpid, boolean hasControl) {
481 log.info("Role change callback for switch {}, hasControl {}",
482 HexString.toHexString(dpid), hasControl);
483
484 synchronized(roleChanger){
485 OFSwitchImpl sw = null;
486 for (OFSwitchImpl connectedSw : connectedSwitches){
487 if (connectedSw.getId() == dpid){
488 sw = connectedSw;
489 break;
490 }
491 }
492 if (sw == null){
493 log.warn("Switch {} not found in connected switches",
494 HexString.toHexString(dpid));
495 return;
496 }
497
498 Role role = null;
499
Pankaj Berde01939e92013-03-08 14:38:27 -0800500 /*
501 * issue #229
502 * Cannot rely on sw.getRole() as it can be behind due to pending
503 * role changes in the queue. Just submit it and late the RoleChanger
504 * handle duplicates.
505 */
506
507 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800508 role = Role.MASTER;
509 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800510 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800511 role = Role.SLAVE;
512 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800513
514 log.debug("Sending role request {} msg to {}", role, sw);
515 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
516 swList.add(sw);
517 roleChanger.submitRequest(swList, role);
518
Jonathan Hartcc957a02013-02-26 10:39:04 -0800519 }
520
521 }
522 }
523
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800524 /**
525 * Channel handler deals with the switch connection and dispatches
526 * switch messages to the appropriate locations.
527 * @author readams
528 */
529 protected class OFChannelHandler
530 extends IdleStateAwareChannelUpstreamHandler {
531 protected OFSwitchImpl sw;
532 protected OFChannelState state;
533
534 public OFChannelHandler(OFChannelState state) {
535 this.state = state;
536 }
537
538 @Override
539 @LogMessageDoc(message="New switch connection from {ip address}",
540 explanation="A new switch has connected from the " +
541 "specified IP address")
542 public void channelConnected(ChannelHandlerContext ctx,
543 ChannelStateEvent e) throws Exception {
544 log.info("New switch connection from {}",
545 e.getChannel().getRemoteAddress());
546
547 sw = new OFSwitchImpl();
548 sw.setChannel(e.getChannel());
549 sw.setFloodlightProvider(Controller.this);
550 sw.setThreadPoolService(threadPool);
551
552 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
553 msglist.add(factory.getMessage(OFType.HELLO));
554 e.getChannel().write(msglist);
555
556 }
557
558 @Override
559 @LogMessageDoc(message="Disconnected switch {switch information}",
560 explanation="The specified switch has disconnected.")
561 public void channelDisconnected(ChannelHandlerContext ctx,
562 ChannelStateEvent e) throws Exception {
563 if (sw != null && state.hsState == HandshakeState.READY) {
564 if (activeSwitches.containsKey(sw.getId())) {
565 // It's safe to call removeSwitch even though the map might
566 // not contain this particular switch but another with the
567 // same DPID
568 removeSwitch(sw);
569 }
570 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700571 if (controlRequested) {
572 registryService.releaseControl(sw.getId());
573 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800574 connectedSwitches.remove(sw);
575 }
576 sw.setConnected(false);
577 }
578 log.info("Disconnected switch {}", sw);
579 }
580
581 @Override
582 @LogMessageDocs({
583 @LogMessageDoc(level="ERROR",
584 message="Disconnecting switch {switch} due to read timeout",
585 explanation="The connected switch has failed to send any " +
586 "messages or respond to echo requests",
587 recommendation=LogMessageDoc.CHECK_SWITCH),
588 @LogMessageDoc(level="ERROR",
589 message="Disconnecting switch {switch}: failed to " +
590 "complete handshake",
591 explanation="The switch did not respond correctly " +
592 "to handshake messages",
593 recommendation=LogMessageDoc.CHECK_SWITCH),
594 @LogMessageDoc(level="ERROR",
595 message="Disconnecting switch {switch} due to IO Error: {}",
596 explanation="There was an error communicating with the switch",
597 recommendation=LogMessageDoc.CHECK_SWITCH),
598 @LogMessageDoc(level="ERROR",
599 message="Disconnecting switch {switch} due to switch " +
600 "state error: {error}",
601 explanation="The switch sent an unexpected message",
602 recommendation=LogMessageDoc.CHECK_SWITCH),
603 @LogMessageDoc(level="ERROR",
604 message="Disconnecting switch {switch} due to " +
605 "message parse failure",
606 explanation="Could not parse a message from the switch",
607 recommendation=LogMessageDoc.CHECK_SWITCH),
608 @LogMessageDoc(level="ERROR",
609 message="Terminating controller due to storage exception",
610 explanation=ERROR_DATABASE,
611 recommendation=LogMessageDoc.CHECK_CONTROLLER),
612 @LogMessageDoc(level="ERROR",
613 message="Could not process message: queue full",
614 explanation="OpenFlow messages are arriving faster than " +
615 " the controller can process them.",
616 recommendation=LogMessageDoc.CHECK_CONTROLLER),
617 @LogMessageDoc(level="ERROR",
618 message="Error while processing message " +
619 "from switch {switch} {cause}",
620 explanation="An error occurred processing the switch message",
621 recommendation=LogMessageDoc.GENERIC_ACTION)
622 })
623 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
624 throws Exception {
625 if (e.getCause() instanceof ReadTimeoutException) {
626 // switch timeout
627 log.error("Disconnecting switch {} due to read timeout", sw);
628 ctx.getChannel().close();
629 } else if (e.getCause() instanceof HandshakeTimeoutException) {
630 log.error("Disconnecting switch {}: failed to complete handshake",
631 sw);
632 ctx.getChannel().close();
633 } else if (e.getCause() instanceof ClosedChannelException) {
634 //log.warn("Channel for sw {} already closed", sw);
635 } else if (e.getCause() instanceof IOException) {
636 log.error("Disconnecting switch {} due to IO Error: {}",
637 sw, e.getCause().getMessage());
638 ctx.getChannel().close();
639 } else if (e.getCause() instanceof SwitchStateException) {
640 log.error("Disconnecting switch {} due to switch state error: {}",
641 sw, e.getCause().getMessage());
642 ctx.getChannel().close();
643 } else if (e.getCause() instanceof MessageParseException) {
644 log.error("Disconnecting switch " + sw +
645 " due to message parse failure",
646 e.getCause());
647 ctx.getChannel().close();
648 } else if (e.getCause() instanceof StorageException) {
649 log.error("Terminating controller due to storage exception",
650 e.getCause());
651 terminate();
652 } else if (e.getCause() instanceof RejectedExecutionException) {
653 log.warn("Could not process message: queue full");
654 } else {
655 log.error("Error while processing message from switch " + sw,
656 e.getCause());
657 ctx.getChannel().close();
658 }
659 }
660
661 @Override
662 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
663 throws Exception {
664 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
665 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
666 e.getChannel().write(msglist);
667 }
668
669 @Override
670 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
671 throws Exception {
672 if (e.getMessage() instanceof List) {
673 @SuppressWarnings("unchecked")
674 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
675
676 for (OFMessage ofm : msglist) {
677 try {
678 processOFMessage(ofm);
679 }
680 catch (Exception ex) {
681 // We are the last handler in the stream, so run the
682 // exception through the channel again by passing in
683 // ctx.getChannel().
684 Channels.fireExceptionCaught(ctx.getChannel(), ex);
685 }
686 }
687
688 // Flush all flow-mods/packet-out generated from this "train"
689 OFSwitchImpl.flush_all();
690 }
691 }
692
693 /**
694 * Process the request for the switch description
695 */
696 @LogMessageDoc(level="ERROR",
697 message="Exception in reading description " +
698 " during handshake {exception}",
699 explanation="Could not process the switch description string",
700 recommendation=LogMessageDoc.CHECK_SWITCH)
701 void processSwitchDescReply() {
702 try {
703 // Read description, if it has been updated
704 @SuppressWarnings("unchecked")
705 Future<List<OFStatistics>> desc_future =
706 (Future<List<OFStatistics>>)sw.
707 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
708 List<OFStatistics> values =
709 desc_future.get(0, TimeUnit.MILLISECONDS);
710 if (values != null) {
711 OFDescriptionStatistics description =
712 new OFDescriptionStatistics();
713 ChannelBuffer data =
714 ChannelBuffers.buffer(description.getLength());
715 for (OFStatistics f : values) {
716 f.writeTo(data);
717 description.readFrom(data);
718 break; // SHOULD be a list of length 1
719 }
720 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
721 description);
722 sw.setSwitchProperties(description);
723 data = null;
724
725 // At this time, also set other switch properties from storage
726 boolean is_core_switch = false;
727 IResultSet resultSet = null;
728 try {
729 String swid = sw.getStringId();
730 resultSet =
731 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
732 for (Iterator<IResultSet> it =
733 resultSet.iterator(); it.hasNext();) {
734 // In case of multiple rows, use the status
735 // in last row?
736 Map<String, Object> row = it.next().getRow();
737 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
738 if (log.isDebugEnabled()) {
739 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
740 "config for switch={}, is-core={}",
741 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
742 }
743 String ics =
744 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
745 is_core_switch = ics.equals("true");
746 }
747 }
748 }
749 finally {
750 if (resultSet != null)
751 resultSet.close();
752 }
753 if (is_core_switch) {
754 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
755 new Boolean(true));
756 }
757 }
758 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
759 state.hasDescription = true;
760 checkSwitchReady();
761 }
762 catch (InterruptedException ex) {
763 // Ignore
764 }
765 catch (TimeoutException ex) {
766 // Ignore
767 } catch (Exception ex) {
768 log.error("Exception in reading description " +
769 " during handshake", ex);
770 }
771 }
772
773 /**
774 * Send initial switch setup information that we need before adding
775 * the switch
776 * @throws IOException
777 */
778 void sendHelloConfiguration() throws IOException {
779 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700780 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800781 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
782 }
783
784 /**
785 * Send the configuration requests we can only do after we have
786 * the features reply
787 * @throws IOException
788 */
789 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700790 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800791 // Ensure we receive the full packet via PacketIn
792 OFSetConfig config = (OFSetConfig) factory
793 .getMessage(OFType.SET_CONFIG);
794 config.setMissSendLength((short) 0xffff)
795 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
796 sw.write(config, null);
797 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
798 null);
799
800 // Get Description to set switch-specific flags
801 OFStatisticsRequest req = new OFStatisticsRequest();
802 req.setStatisticType(OFStatisticsType.DESC);
803 req.setLengthU(req.getLengthU());
804 Future<List<OFStatistics>> dfuture =
805 sw.getStatistics(req);
806 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
807 dfuture);
808
809 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700810
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700811 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800812 protected void checkSwitchReady() {
813 if (state.hsState == HandshakeState.FEATURES_REPLY &&
814 state.hasDescription && state.hasGetConfigReply) {
815
816 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700817 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800818
819 synchronized(roleChanger) {
820 // We need to keep track of all of the switches that are connected
821 // to the controller, in any role, so that we can later send the
822 // role request messages when the controller role changes.
823 // We need to be synchronized while doing this: we must not
824 // send a another role request to the connectedSwitches until
825 // we were able to add this new switch to connectedSwitches
826 // *and* send the current role to the new switch.
827 connectedSwitches.add(sw);
828
829 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800830 //Put the switch in SLAVE mode until we know we have control
831 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
832 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
833 swList.add(sw);
834 roleChanger.submitRequest(swList, Role.SLAVE);
835
Jonathan Hartcc957a02013-02-26 10:39:04 -0800836 //Request control of the switch from the global registry
837 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700838 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800839 registryService.requestControl(sw.getId(),
840 new RoleChangeCallback());
841 } catch (RegistryException e) {
842 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700843 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800844 }
845
Jonathan Hart97801ac2013-02-26 14:29:16 -0800846
Jonathan Hartcc957a02013-02-26 10:39:04 -0800847
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800848 // Send a role request if role support is enabled for the controller
849 // This is a probe that we'll use to determine if the switch
850 // actually supports the role request message. If it does we'll
851 // get back a role reply message. If it doesn't we'll get back an
852 // OFError message.
853 // If role is MASTER we will promote switch to active
854 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800855 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800856 log.debug("This controller's role is {}, " +
857 "sending initial role request msg to {}",
858 role, sw);
859 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
860 swList.add(sw);
861 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800862 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800863 }
864 else {
865 // Role supported not enabled on controller (for now)
866 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800867 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800868 "not sending role request msg to {}",
869 role, sw);
870 // Need to clear FlowMods before we add the switch
871 // and dispatch updates otherwise we have a race condition.
872 sw.clearAllFlowMods();
873 addSwitch(sw);
874 state.firstRoleReplyReceived = true;
875 }
876 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700877 if (!controlRequested) {
878 // yield to allow other thread(s) to release control
879 try {
880 Thread.sleep(10);
881 } catch (InterruptedException e) {
882 // Ignore interruptions
883 }
884 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700885 log.debug("Closing {} because we weren't able to request control " +
886 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700887 sw.channel.close();
888 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800889 }
890 }
891
892 /* Handle a role reply message we received from the switch. Since
893 * netty serializes message dispatch we don't need to synchronize
894 * against other receive operations from the same switch, so no need
895 * to synchronize addSwitch(), removeSwitch() operations from the same
896 * connection.
897 * FIXME: However, when a switch with the same DPID connects we do
898 * need some synchronization. However, handling switches with same
899 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
900 * removedSwitch notification):1
901 *
902 */
903 @LogMessageDoc(level="ERROR",
904 message="Invalid role value in role reply message",
905 explanation="Was unable to set the HA role (master or slave) " +
906 "for the controller.",
907 recommendation=LogMessageDoc.CHECK_CONTROLLER)
908 protected void handleRoleReplyMessage(OFVendor vendorMessage,
909 OFRoleReplyVendorData roleReplyVendorData) {
910 // Map from the role code in the message to our role enum
911 int nxRole = roleReplyVendorData.getRole();
912 Role role = null;
913 switch (nxRole) {
914 case OFRoleVendorData.NX_ROLE_OTHER:
915 role = Role.EQUAL;
916 break;
917 case OFRoleVendorData.NX_ROLE_MASTER:
918 role = Role.MASTER;
919 break;
920 case OFRoleVendorData.NX_ROLE_SLAVE:
921 role = Role.SLAVE;
922 break;
923 default:
924 log.error("Invalid role value in role reply message");
925 sw.getChannel().close();
926 return;
927 }
928
929 log.debug("Handling role reply for role {} from {}. " +
930 "Controller's role is {} ",
931 new Object[] { role, sw, Controller.this.role}
932 );
933
934 sw.deliverRoleReply(vendorMessage.getXid(), role);
935
936 boolean isActive = activeSwitches.containsKey(sw.getId());
937 if (!isActive && sw.isActive()) {
938 // Transition from SLAVE to MASTER.
939
940 if (!state.firstRoleReplyReceived ||
941 getAlwaysClearFlowsOnSwAdd()) {
942 // This is the first role-reply message we receive from
943 // this switch or roles were disabled when the switch
944 // connected:
945 // Delete all pre-existing flows for new connections to
946 // the master
947 //
948 // FIXME: Need to think more about what the test should
949 // be for when we flush the flow-table? For example,
950 // if all the controllers are temporarily in the backup
951 // role (e.g. right after a failure of the master
952 // controller) at the point the switch connects, then
953 // all of the controllers will initially connect as
954 // backup controllers and not flush the flow-table.
955 // Then when one of them is promoted to master following
956 // the master controller election the flow-table
957 // will still not be flushed because that's treated as
958 // a failover event where we don't want to flush the
959 // flow-table. The end result would be that the flow
960 // table for a newly connected switch is never
961 // flushed. Not sure how to handle that case though...
962 sw.clearAllFlowMods();
963 log.debug("First role reply from master switch {}, " +
964 "clear FlowTable to active switch list",
965 HexString.toHexString(sw.getId()));
966 }
967
968 // Some switches don't seem to update us with port
969 // status messages while in slave role.
970 readSwitchPortStateFromStorage(sw);
971
972 // Only add the switch to the active switch list if
973 // we're not in the slave role. Note that if the role
974 // attribute is null, then that means that the switch
975 // doesn't support the role request messages, so in that
976 // case we're effectively in the EQUAL role and the
977 // switch should be included in the active switch list.
978 addSwitch(sw);
979 log.debug("Added master switch {} to active switch list",
980 HexString.toHexString(sw.getId()));
981
982 }
983 else if (isActive && !sw.isActive()) {
984 // Transition from MASTER to SLAVE: remove switch
985 // from active switch list.
986 log.debug("Removed slave switch {} from active switch" +
987 " list", HexString.toHexString(sw.getId()));
988 removeSwitch(sw);
989 }
990
991 // Indicate that we have received a role reply message.
992 state.firstRoleReplyReceived = true;
993 }
994
995 protected boolean handleVendorMessage(OFVendor vendorMessage) {
996 boolean shouldHandleMessage = false;
997 int vendor = vendorMessage.getVendor();
998 switch (vendor) {
999 case OFNiciraVendorData.NX_VENDOR_ID:
1000 OFNiciraVendorData niciraVendorData =
1001 (OFNiciraVendorData)vendorMessage.getVendorData();
1002 int dataType = niciraVendorData.getDataType();
1003 switch (dataType) {
1004 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
1005 OFRoleReplyVendorData roleReplyVendorData =
1006 (OFRoleReplyVendorData) niciraVendorData;
1007 handleRoleReplyMessage(vendorMessage,
1008 roleReplyVendorData);
1009 break;
1010 default:
1011 log.warn("Unhandled Nicira VENDOR message; " +
1012 "data type = {}", dataType);
1013 break;
1014 }
1015 break;
1016 default:
1017 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1018 break;
1019 }
1020
1021 return shouldHandleMessage;
1022 }
1023
1024 /**
1025 * Dispatch an Openflow message from a switch to the appropriate
1026 * handler.
1027 * @param m The message to process
1028 * @throws IOException
1029 * @throws SwitchStateException
1030 */
1031 @LogMessageDocs({
1032 @LogMessageDoc(level="WARN",
1033 message="Config Reply from {switch} has " +
1034 "miss length set to {length}",
1035 explanation="The controller requires that the switch " +
1036 "use a miss length of 0xffff for correct " +
1037 "function",
1038 recommendation="Use a different switch to ensure " +
1039 "correct function"),
1040 @LogMessageDoc(level="WARN",
1041 message="Received ERROR from sw {switch} that "
1042 +"indicates roles are not supported "
1043 +"but we have received a valid "
1044 +"role reply earlier",
1045 explanation="The switch sent a confusing message to the" +
1046 "controller")
1047 })
1048 protected void processOFMessage(OFMessage m)
1049 throws IOException, SwitchStateException {
1050 boolean shouldHandleMessage = false;
1051
1052 switch (m.getType()) {
1053 case HELLO:
1054 if (log.isTraceEnabled())
1055 log.trace("HELLO from {}", sw);
1056
1057 if (state.hsState.equals(HandshakeState.START)) {
1058 state.hsState = HandshakeState.HELLO;
1059 sendHelloConfiguration();
1060 } else {
1061 throw new SwitchStateException("Unexpected HELLO from "
1062 + sw);
1063 }
1064 break;
1065 case ECHO_REQUEST:
1066 OFEchoReply reply =
1067 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1068 reply.setXid(m.getXid());
1069 sw.write(reply, null);
1070 break;
1071 case ECHO_REPLY:
1072 break;
1073 case FEATURES_REPLY:
1074 if (log.isTraceEnabled())
1075 log.trace("Features Reply from {}", sw);
1076
1077 sw.setFeaturesReply((OFFeaturesReply) m);
1078 if (state.hsState.equals(HandshakeState.HELLO)) {
1079 sendFeatureReplyConfiguration();
1080 state.hsState = HandshakeState.FEATURES_REPLY;
1081 // uncomment to enable "dumb" switches like cbench
1082 // state.hsState = HandshakeState.READY;
1083 // addSwitch(sw);
1084 } else {
1085 // return results to rest api caller
1086 sw.deliverOFFeaturesReply(m);
1087 // update database */
1088 updateActiveSwitchInfo(sw);
1089 }
1090 break;
1091 case GET_CONFIG_REPLY:
1092 if (log.isTraceEnabled())
1093 log.trace("Get config reply from {}", sw);
1094
1095 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1096 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1097 throw new SwitchStateException(em);
1098 }
1099 OFGetConfigReply cr = (OFGetConfigReply) m;
1100 if (cr.getMissSendLength() == (short)0xffff) {
1101 log.trace("Config Reply from {} confirms " +
1102 "miss length set to 0xffff", sw);
1103 } else {
1104 log.warn("Config Reply from {} has " +
1105 "miss length set to {}",
1106 sw, cr.getMissSendLength() & 0xffff);
1107 }
1108 state.hasGetConfigReply = true;
1109 checkSwitchReady();
1110 break;
1111 case VENDOR:
1112 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1113 break;
1114 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001115 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001116 // TODO: we need better error handling. Especially for
1117 // request/reply style message (stats, roles) we should have
1118 // a unified way to lookup the xid in the error message.
1119 // This will probable involve rewriting the way we handle
1120 // request/reply style messages.
1121 OFError error = (OFError) m;
1122 boolean shouldLogError = true;
1123 // TODO: should we check that firstRoleReplyReceived is false,
1124 // i.e., check only whether the first request fails?
1125 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1126 boolean isBadVendorError =
1127 (error.getErrorType() == OFError.OFErrorType.
1128 OFPET_BAD_REQUEST.getValue());
1129 // We expect to receive a bad vendor error when
1130 // we're connected to a switch that doesn't support
1131 // the Nicira vendor extensions (i.e. not OVS or
1132 // derived from OVS). By protocol, it should also be
1133 // BAD_VENDOR, but too many switch implementations
1134 // get it wrong and we can already check the xid()
1135 // so we can ignore the type with confidence that this
1136 // is not a spurious error
1137 shouldLogError = !isBadVendorError;
1138 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001139 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001140 if (state.firstRoleReplyReceived && (role != null)) {
1141 log.warn("Received ERROR from sw {} that "
1142 +"indicates roles are not supported "
1143 +"but we have received a valid "
1144 +"role reply earlier", sw);
1145 }
1146 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001147 Role requestedRole =
HIGUCHI Yutaeae374d2013-06-17 10:39:42 -07001148 sw.deliverRoleRequestNotSupportedEx(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001149 synchronized(roleChanger) {
1150 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001151 //This will now never happen. The Controller's role
1152 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001153 // the switch doesn't understand role request
1154 // messages and the current controller role is
1155 // slave. We need to disconnect the switch.
1156 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001157 log.warn("Closing {} channel because controller's role " +
1158 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001159 sw.getChannel().close();
1160 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001161 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001162 log.debug("Adding switch {} because we got an error" +
1163 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001164 // Controller's role is master: add to
1165 // active
1166 // TODO: check if clearing flow table is
1167 // right choice here.
1168 // Need to clear FlowMods before we add the switch
1169 // and dispatch updates otherwise we have a race condition.
1170 // TODO: switch update is async. Won't we still have a potential
1171 // race condition?
1172 sw.clearAllFlowMods();
1173 addSwitch(sw);
1174 }
1175 }
1176 }
1177 else {
1178 // TODO: Is this the right thing to do if we receive
1179 // some other error besides a bad vendor error?
1180 // Presumably that means the switch did actually
1181 // understand the role request message, but there
1182 // was some other error from processing the message.
1183 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1184 // error code, but it doesn't look like the Nicira
1185 // role request has that. Should check OVS source
1186 // code to see if it's possible for any other errors
1187 // to be returned.
1188 // If we received an error the switch is not
1189 // in the correct role, so we need to disconnect it.
1190 // We could also resend the request but then we need to
1191 // check if there are other pending request in which
1192 // case we shouldn't resend. If we do resend we need
1193 // to make sure that the switch eventually accepts one
1194 // of our requests or disconnect the switch. This feels
1195 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001196 log.debug("Closing {} channel because we recieved an " +
1197 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001198 sw.getChannel().close();
1199 }
1200 }
1201 // Once we support OF 1.2, we'd add code to handle it here.
1202 //if (error.getXid() == state.ofRoleRequestXid) {
1203 //}
1204 if (shouldLogError)
1205 logError(sw, error);
1206 break;
1207 case STATS_REPLY:
1208 if (state.hsState.ordinal() <
1209 HandshakeState.FEATURES_REPLY.ordinal()) {
1210 String em = "Unexpected STATS_REPLY from " + sw;
1211 throw new SwitchStateException(em);
1212 }
1213 sw.deliverStatisticsReply(m);
1214 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1215 processSwitchDescReply();
1216 }
1217 break;
1218 case PORT_STATUS:
1219 // We want to update our port state info even if we're in
1220 // the slave role, but we only want to update storage if
1221 // we're the master (or equal).
1222 boolean updateStorage = state.hsState.
1223 equals(HandshakeState.READY) &&
1224 (sw.getRole() != Role.SLAVE);
1225 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1226 shouldHandleMessage = true;
1227 break;
1228
1229 default:
1230 shouldHandleMessage = true;
1231 break;
1232 }
1233
1234 if (shouldHandleMessage) {
1235 sw.getListenerReadLock().lock();
1236 try {
1237 if (sw.isConnected()) {
1238 if (!state.hsState.equals(HandshakeState.READY)) {
1239 log.debug("Ignoring message type {} received " +
1240 "from switch {} before switch is " +
1241 "fully configured.", m.getType(), sw);
1242 }
1243 // Check if the controller is in the slave role for the
1244 // switch. If it is, then don't dispatch the message to
1245 // the listeners.
1246 // TODO: Should we dispatch messages that we expect to
1247 // receive when we're in the slave role, e.g. port
1248 // status messages? Since we're "hiding" switches from
1249 // the listeners when we're in the slave role, then it
1250 // seems a little weird to dispatch port status messages
1251 // to them. On the other hand there might be special
1252 // modules that care about all of the connected switches
1253 // and would like to receive port status notifications.
1254 else if (sw.getRole() == Role.SLAVE) {
1255 // Don't log message if it's a port status message
1256 // since we expect to receive those from the switch
1257 // and don't want to emit spurious messages.
1258 if (m.getType() != OFType.PORT_STATUS) {
1259 log.debug("Ignoring message type {} received " +
1260 "from switch {} while in the slave role.",
1261 m.getType(), sw);
1262 }
1263 } else {
1264 handleMessage(sw, m, null);
1265 }
1266 }
1267 }
1268 finally {
1269 sw.getListenerReadLock().unlock();
1270 }
1271 }
1272 }
1273 }
1274
1275 // ****************
1276 // Message handlers
1277 // ****************
1278
1279 protected void handlePortStatusMessage(IOFSwitch sw,
1280 OFPortStatus m,
1281 boolean updateStorage) {
1282 short portNumber = m.getDesc().getPortNumber();
1283 OFPhysicalPort port = m.getDesc();
1284 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001285 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1286 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001287 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001288 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001289 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1290 try {
1291 this.updates.put(update);
1292 } catch (InterruptedException e) {
1293 log.error("Failure adding update to queue", e);
1294 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001295 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001296 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1297 try {
1298 this.updates.put(update);
1299 } catch (InterruptedException e) {
1300 log.error("Failure adding update to queue", e);
1301 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001302 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001303 if (updateStorage)
1304 updatePortInfo(sw, port);
1305 log.debug("Port #{} modified for {}", portNumber, sw);
1306 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1307 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001308 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1309 try {
1310 this.updates.put(update);
1311 } catch (InterruptedException e) {
1312 log.error("Failure adding update to queue", e);
1313 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001314 if (updateStorage)
1315 updatePortInfo(sw, port);
1316 log.debug("Port #{} added for {}", portNumber, sw);
1317 } else if (m.getReason() ==
1318 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1319 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001320 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1321 try {
1322 this.updates.put(update);
1323 } catch (InterruptedException e) {
1324 log.error("Failure adding update to queue", e);
1325 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001326 if (updateStorage)
1327 removePortInfo(sw, portNumber);
1328 log.debug("Port #{} deleted for {}", portNumber, sw);
1329 }
1330 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1331 try {
1332 this.updates.put(update);
1333 } catch (InterruptedException e) {
1334 log.error("Failure adding update to queue", e);
1335 }
1336 }
1337
1338 /**
1339 * flcontext_cache - Keep a thread local stack of contexts
1340 */
1341 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1342 new ThreadLocal <Stack<FloodlightContext>> () {
1343 @Override
1344 protected Stack<FloodlightContext> initialValue() {
1345 return new Stack<FloodlightContext>();
1346 }
1347 };
1348
1349 /**
1350 * flcontext_alloc - pop a context off the stack, if required create a new one
1351 * @return FloodlightContext
1352 */
1353 protected static FloodlightContext flcontext_alloc() {
1354 FloodlightContext flcontext = null;
1355
1356 if (flcontext_cache.get().empty()) {
1357 flcontext = new FloodlightContext();
1358 }
1359 else {
1360 flcontext = flcontext_cache.get().pop();
1361 }
1362
1363 return flcontext;
1364 }
1365
1366 /**
1367 * flcontext_free - Free the context to the current thread
1368 * @param flcontext
1369 */
1370 protected void flcontext_free(FloodlightContext flcontext) {
1371 flcontext.getStorage().clear();
1372 flcontext_cache.get().push(flcontext);
1373 }
1374
1375 /**
1376 * Handle replies to certain OFMessages, and pass others off to listeners
1377 * @param sw The switch for the message
1378 * @param m The message
1379 * @param bContext The floodlight context. If null then floodlight context would
1380 * be allocated in this function
1381 * @throws IOException
1382 */
1383 @LogMessageDocs({
1384 @LogMessageDoc(level="ERROR",
1385 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1386 " field is empty.",
1387 explanation="The switch sent an improperly-formatted PacketIn" +
1388 " message",
1389 recommendation=LogMessageDoc.CHECK_SWITCH),
1390 @LogMessageDoc(level="WARN",
1391 message="Unhandled OF Message: {} from {}",
1392 explanation="The switch sent a message not handled by " +
1393 "the controller")
1394 })
1395 protected void handleMessage(IOFSwitch sw, OFMessage m,
1396 FloodlightContext bContext)
1397 throws IOException {
1398 Ethernet eth = null;
1399
1400 switch (m.getType()) {
1401 case PACKET_IN:
1402 OFPacketIn pi = (OFPacketIn)m;
1403
1404 if (pi.getPacketData().length <= 0) {
1405 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1406 ") because the data field is empty.");
1407 return;
1408 }
1409
1410 if (Controller.ALWAYS_DECODE_ETH) {
1411 eth = new Ethernet();
1412 eth.deserialize(pi.getPacketData(), 0,
1413 pi.getPacketData().length);
1414 counterStore.updatePacketInCounters(sw, m, eth);
1415 }
1416 // fall through to default case...
1417
1418 default:
1419
1420 List<IOFMessageListener> listeners = null;
1421 if (messageListeners.containsKey(m.getType())) {
1422 listeners = messageListeners.get(m.getType()).
1423 getOrderedListeners();
1424 }
1425
1426 FloodlightContext bc = null;
1427 if (listeners != null) {
1428 // Check if floodlight context is passed from the calling
1429 // function, if so use that floodlight context, otherwise
1430 // allocate one
1431 if (bContext == null) {
1432 bc = flcontext_alloc();
1433 } else {
1434 bc = bContext;
1435 }
1436 if (eth != null) {
1437 IFloodlightProviderService.bcStore.put(bc,
1438 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1439 eth);
1440 }
1441
1442 // Get the starting time (overall and per-component) of
1443 // the processing chain for this packet if performance
1444 // monitoring is turned on
1445 pktinProcTime.bootstrap(listeners);
1446 pktinProcTime.recordStartTimePktIn();
1447 Command cmd;
1448 for (IOFMessageListener listener : listeners) {
1449 if (listener instanceof IOFSwitchFilter) {
1450 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1451 continue;
1452 }
1453 }
1454
1455 pktinProcTime.recordStartTimeComp(listener);
1456 cmd = listener.receive(sw, m, bc);
1457 pktinProcTime.recordEndTimeComp(listener);
1458
1459 if (Command.STOP.equals(cmd)) {
1460 break;
1461 }
1462 }
1463 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1464 } else {
1465 log.warn("Unhandled OF Message: {} from {}", m, sw);
1466 }
1467
1468 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1469 }
1470 }
1471
1472 /**
1473 * Log an OpenFlow error message from a switch
1474 * @param sw The switch that sent the error
1475 * @param error The error message
1476 */
1477 @LogMessageDoc(level="ERROR",
1478 message="Error {error type} {error code} from {switch}",
1479 explanation="The switch responded with an unexpected error" +
1480 "to an OpenFlow message from the controller",
1481 recommendation="This could indicate improper network operation. " +
1482 "If the problem persists restarting the switch and " +
1483 "controller may help."
1484 )
1485 protected void logError(IOFSwitch sw, OFError error) {
1486 int etint = 0xffff & error.getErrorType();
1487 if (etint < 0 || etint >= OFErrorType.values().length) {
1488 log.error("Unknown error code {} from sw {}", etint, sw);
1489 }
1490 OFErrorType et = OFErrorType.values()[etint];
1491 switch (et) {
1492 case OFPET_HELLO_FAILED:
1493 OFHelloFailedCode hfc =
1494 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1495 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1496 break;
1497 case OFPET_BAD_REQUEST:
1498 OFBadRequestCode brc =
1499 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1500 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1501 break;
1502 case OFPET_BAD_ACTION:
1503 OFBadActionCode bac =
1504 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1505 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1506 break;
1507 case OFPET_FLOW_MOD_FAILED:
1508 OFFlowModFailedCode fmfc =
1509 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1510 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1511 break;
1512 case OFPET_PORT_MOD_FAILED:
1513 OFPortModFailedCode pmfc =
1514 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1515 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1516 break;
1517 case OFPET_QUEUE_OP_FAILED:
1518 OFQueueOpFailedCode qofc =
1519 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1520 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1521 break;
1522 default:
1523 break;
1524 }
1525 }
1526
1527 /**
1528 * Add a switch to the active switch list and call the switch listeners.
1529 * This happens either when a switch first connects (and the controller is
1530 * not in the slave role) or when the role of the controller changes from
1531 * slave to master.
1532 * @param sw the switch that has been added
1533 */
1534 // TODO: need to rethink locking and the synchronous switch update.
1535 // We can / should also handle duplicate DPIDs in connectedSwitches
1536 @LogMessageDoc(level="ERROR",
1537 message="New switch added {switch} for already-added switch {switch}",
1538 explanation="A switch with the same DPID as another switch " +
1539 "connected to the controller. This can be caused by " +
1540 "multiple switches configured with the same DPID, or " +
1541 "by a switch reconnected very quickly after " +
1542 "disconnecting.",
1543 recommendation="If this happens repeatedly, it is likely there " +
1544 "are switches with duplicate DPIDs on the network. " +
1545 "Reconfigure the appropriate switches. If it happens " +
1546 "very rarely, then it is likely this is a transient " +
1547 "network problem that can be ignored."
1548 )
1549 protected void addSwitch(IOFSwitch sw) {
1550 // TODO: is it safe to modify the HashMap without holding
1551 // the old switch's lock?
1552 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1553 if (sw == oldSw) {
1554 // Note == for object equality, not .equals for value
1555 log.info("New add switch for pre-existing switch {}", sw);
1556 return;
1557 }
1558
1559 if (oldSw != null) {
1560 oldSw.getListenerWriteLock().lock();
1561 try {
1562 log.error("New switch added {} for already-added switch {}",
1563 sw, oldSw);
1564 // Set the connected flag to false to suppress calling
1565 // the listeners for this switch in processOFMessage
1566 oldSw.setConnected(false);
1567
1568 oldSw.cancelAllStatisticsReplies();
1569
1570 updateInactiveSwitchInfo(oldSw);
1571
1572 // we need to clean out old switch state definitively
1573 // before adding the new switch
1574 // FIXME: It seems not completely kosher to call the
1575 // switch listeners here. I thought one of the points of
1576 // having the asynchronous switch update mechanism was so
1577 // the addedSwitch and removedSwitch were always called
1578 // from a single thread to simplify concurrency issues
1579 // for the listener.
1580 if (switchListeners != null) {
1581 for (IOFSwitchListener listener : switchListeners) {
1582 listener.removedSwitch(oldSw);
1583 }
1584 }
1585 // will eventually trigger a removeSwitch(), which will cause
1586 // a "Not removing Switch ... already removed debug message.
1587 // TODO: Figure out a way to handle this that avoids the
1588 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001589 log.debug("Closing {} because a new IOFSwitch got added " +
1590 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001591 oldSw.getChannel().close();
1592 }
1593 finally {
1594 oldSw.getListenerWriteLock().unlock();
1595 }
1596 }
1597
1598 updateActiveSwitchInfo(sw);
1599 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1600 try {
1601 this.updates.put(update);
1602 } catch (InterruptedException e) {
1603 log.error("Failure adding update to queue", e);
1604 }
1605 }
1606
1607 /**
1608 * Remove a switch from the active switch list and call the switch listeners.
1609 * This happens either when the switch is disconnected or when the
1610 * controller's role for the switch changes from master to slave.
1611 * @param sw the switch that has been removed
1612 */
1613 protected void removeSwitch(IOFSwitch sw) {
1614 // No need to acquire the listener lock, since
1615 // this method is only called after netty has processed all
1616 // pending messages
1617 log.debug("removeSwitch: {}", sw);
1618 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1619 log.debug("Not removing switch {}; already removed", sw);
1620 return;
1621 }
1622 // We cancel all outstanding statistics replies if the switch transition
1623 // from active. In the future we might allow statistics requests
1624 // from slave controllers. Then we need to move this cancelation
1625 // to switch disconnect
1626 sw.cancelAllStatisticsReplies();
1627
1628 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1629 // here if role support is enabled. In that case if the switch is being
1630 // removed because we've been switched to being in the slave role, then I think
1631 // it's possible that the new master may have already been promoted to master
1632 // and written out the active switch state to storage. If we now execute
1633 // updateInactiveSwitchInfo we may wipe out all of the state that was
1634 // written out by the new master. Maybe need to revisit how we handle all
1635 // of the switch state that's written to storage.
1636
1637 updateInactiveSwitchInfo(sw);
1638 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1639 try {
1640 this.updates.put(update);
1641 } catch (InterruptedException e) {
1642 log.error("Failure adding update to queue", e);
1643 }
1644 }
1645
1646 // ***************
1647 // IFloodlightProvider
1648 // ***************
1649
1650 @Override
1651 public synchronized void addOFMessageListener(OFType type,
1652 IOFMessageListener listener) {
1653 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1654 messageListeners.get(type);
1655 if (ldd == null) {
1656 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1657 messageListeners.put(type, ldd);
1658 }
1659 ldd.addListener(type, listener);
1660 }
1661
1662 @Override
1663 public synchronized void removeOFMessageListener(OFType type,
1664 IOFMessageListener listener) {
1665 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1666 messageListeners.get(type);
1667 if (ldd != null) {
1668 ldd.removeListener(listener);
1669 }
1670 }
1671
1672 private void logListeners() {
1673 for (Map.Entry<OFType,
1674 ListenerDispatcher<OFType,
1675 IOFMessageListener>> entry
1676 : messageListeners.entrySet()) {
1677
1678 OFType type = entry.getKey();
1679 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1680 entry.getValue();
1681
1682 StringBuffer sb = new StringBuffer();
1683 sb.append("OFListeners for ");
1684 sb.append(type);
1685 sb.append(": ");
1686 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1687 sb.append(l.getName());
1688 sb.append(",");
1689 }
1690 log.debug(sb.toString());
1691 }
1692 }
1693
1694 public void removeOFMessageListeners(OFType type) {
1695 messageListeners.remove(type);
1696 }
1697
1698 @Override
1699 public Map<Long, IOFSwitch> getSwitches() {
1700 return Collections.unmodifiableMap(this.activeSwitches);
1701 }
1702
1703 @Override
1704 public void addOFSwitchListener(IOFSwitchListener listener) {
1705 this.switchListeners.add(listener);
1706 }
1707
1708 @Override
1709 public void removeOFSwitchListener(IOFSwitchListener listener) {
1710 this.switchListeners.remove(listener);
1711 }
1712
1713 @Override
1714 public Map<OFType, List<IOFMessageListener>> getListeners() {
1715 Map<OFType, List<IOFMessageListener>> lers =
1716 new HashMap<OFType, List<IOFMessageListener>>();
1717 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1718 messageListeners.entrySet()) {
1719 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1720 }
1721 return Collections.unmodifiableMap(lers);
1722 }
1723
1724 @Override
1725 @LogMessageDocs({
1726 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1727 "a null switch",
1728 explanation="Failed to process a message because the switch " +
1729 " is no longer connected."),
1730 @LogMessageDoc(level="ERROR",
1731 message="Error reinjecting OFMessage on switch {switch}",
1732 explanation="An I/O error occured while attempting to " +
1733 "process an OpenFlow message",
1734 recommendation=LogMessageDoc.CHECK_SWITCH)
1735 })
1736 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1737 FloodlightContext bc) {
1738 if (sw == null) {
1739 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1740 return false;
1741 }
1742
1743 // FIXME: Do we need to be able to inject messages to switches
1744 // where we're the slave controller (i.e. they're connected but
1745 // not active)?
1746 // FIXME: Don't we need synchronization logic here so we're holding
1747 // the listener read lock when we call handleMessage? After some
1748 // discussions it sounds like the right thing to do here would be to
1749 // inject the message as a netty upstream channel event so it goes
1750 // through the normal netty event processing, including being
1751 // handled
1752 if (!activeSwitches.containsKey(sw.getId())) return false;
1753
1754 try {
1755 // Pass Floodlight context to the handleMessages()
1756 handleMessage(sw, msg, bc);
1757 } catch (IOException e) {
1758 log.error("Error reinjecting OFMessage on switch {}",
1759 HexString.toHexString(sw.getId()));
1760 return false;
1761 }
1762 return true;
1763 }
1764
1765 @Override
1766 @LogMessageDoc(message="Calling System.exit",
1767 explanation="The controller is terminating")
1768 public synchronized void terminate() {
1769 log.info("Calling System.exit");
1770 System.exit(1);
1771 }
1772
1773 @Override
1774 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1775 // call the overloaded version with floodlight context set to null
1776 return injectOfMessage(sw, msg, null);
1777 }
1778
1779 @Override
1780 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1781 FloodlightContext bc) {
1782 if (log.isTraceEnabled()) {
1783 String str = OFMessage.getDataAsString(sw, m, bc);
1784 log.trace("{}", str);
1785 }
1786
1787 List<IOFMessageListener> listeners = null;
1788 if (messageListeners.containsKey(m.getType())) {
1789 listeners =
1790 messageListeners.get(m.getType()).getOrderedListeners();
1791 }
1792
1793 if (listeners != null) {
1794 for (IOFMessageListener listener : listeners) {
1795 if (listener instanceof IOFSwitchFilter) {
1796 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1797 continue;
1798 }
1799 }
1800 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1801 break;
1802 }
1803 }
1804 }
1805 }
1806
1807 @Override
1808 public BasicFactory getOFMessageFactory() {
1809 return factory;
1810 }
1811
1812 @Override
1813 public String getControllerId() {
1814 return controllerId;
1815 }
1816
1817 // **************
1818 // Initialization
1819 // **************
1820
1821 protected void updateAllInactiveSwitchInfo() {
1822 if (role == Role.SLAVE) {
1823 return;
1824 }
1825 String controllerId = getControllerId();
1826 String[] switchColumns = { SWITCH_DATAPATH_ID,
1827 SWITCH_CONTROLLER_ID,
1828 SWITCH_ACTIVE };
1829 String[] portColumns = { PORT_ID, PORT_SWITCH };
1830 IResultSet switchResultSet = null;
1831 try {
1832 OperatorPredicate op =
1833 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1834 OperatorPredicate.Operator.EQ,
1835 controllerId);
1836 switchResultSet =
1837 storageSource.executeQuery(SWITCH_TABLE_NAME,
1838 switchColumns,
1839 op, null);
1840 while (switchResultSet.next()) {
1841 IResultSet portResultSet = null;
1842 try {
1843 String datapathId =
1844 switchResultSet.getString(SWITCH_DATAPATH_ID);
1845 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1846 op = new OperatorPredicate(PORT_SWITCH,
1847 OperatorPredicate.Operator.EQ,
1848 datapathId);
1849 portResultSet =
1850 storageSource.executeQuery(PORT_TABLE_NAME,
1851 portColumns,
1852 op, null);
1853 while (portResultSet.next()) {
1854 portResultSet.deleteRow();
1855 }
1856 portResultSet.save();
1857 }
1858 finally {
1859 if (portResultSet != null)
1860 portResultSet.close();
1861 }
1862 }
1863 switchResultSet.save();
1864 }
1865 finally {
1866 if (switchResultSet != null)
1867 switchResultSet.close();
1868 }
1869 }
1870
1871 protected void updateControllerInfo() {
1872 updateAllInactiveSwitchInfo();
1873
1874 // Write out the controller info to the storage source
1875 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1876 String id = getControllerId();
1877 controllerInfo.put(CONTROLLER_ID, id);
1878 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1879 }
1880
1881 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1882 if (role == Role.SLAVE) {
1883 return;
1884 }
1885 // Obtain the row info for the switch
1886 Map<String, Object> switchInfo = new HashMap<String, Object>();
1887 String datapathIdString = sw.getStringId();
1888 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1889 String controllerId = getControllerId();
1890 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1891 Date connectedSince = sw.getConnectedSince();
1892 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1893 Channel channel = sw.getChannel();
1894 SocketAddress socketAddress = channel.getRemoteAddress();
1895 if (socketAddress != null) {
1896 String socketAddressString = socketAddress.toString();
1897 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1898 if (socketAddress instanceof InetSocketAddress) {
1899 InetSocketAddress inetSocketAddress =
1900 (InetSocketAddress)socketAddress;
1901 InetAddress inetAddress = inetSocketAddress.getAddress();
1902 String ip = inetAddress.getHostAddress();
1903 switchInfo.put(SWITCH_IP, ip);
1904 }
1905 }
1906
1907 // Write out the switch features info
1908 long capabilities = U32.f(sw.getCapabilities());
1909 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1910 long buffers = U32.f(sw.getBuffers());
1911 switchInfo.put(SWITCH_BUFFERS, buffers);
1912 long tables = U32.f(sw.getTables());
1913 switchInfo.put(SWITCH_TABLES, tables);
1914 long actions = U32.f(sw.getActions());
1915 switchInfo.put(SWITCH_ACTIONS, actions);
1916 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1917
1918 // Update the switch
1919 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1920
1921 // Update the ports
1922 for (OFPhysicalPort port: sw.getPorts()) {
1923 updatePortInfo(sw, port);
1924 }
1925 }
1926
1927 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1928 if (role == Role.SLAVE) {
1929 return;
1930 }
1931 log.debug("Update DB with inactiveSW {}", sw);
1932 // Update the controller info in the storage source to be inactive
1933 Map<String, Object> switchInfo = new HashMap<String, Object>();
1934 String datapathIdString = sw.getStringId();
1935 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1936 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1937 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1938 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1939 }
1940
1941 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1942 if (role == Role.SLAVE) {
1943 return;
1944 }
1945 String datapathIdString = sw.getStringId();
1946 Map<String, Object> portInfo = new HashMap<String, Object>();
1947 int portNumber = U16.f(port.getPortNumber());
1948 String id = datapathIdString + "|" + portNumber;
1949 portInfo.put(PORT_ID, id);
1950 portInfo.put(PORT_SWITCH, datapathIdString);
1951 portInfo.put(PORT_NUMBER, portNumber);
1952 byte[] hardwareAddress = port.getHardwareAddress();
1953 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1954 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1955 String name = port.getName();
1956 portInfo.put(PORT_NAME, name);
1957 long config = U32.f(port.getConfig());
1958 portInfo.put(PORT_CONFIG, config);
1959 long state = U32.f(port.getState());
1960 portInfo.put(PORT_STATE, state);
1961 long currentFeatures = U32.f(port.getCurrentFeatures());
1962 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1963 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1964 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1965 long supportedFeatures = U32.f(port.getSupportedFeatures());
1966 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1967 long peerFeatures = U32.f(port.getPeerFeatures());
1968 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1969 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1970 }
1971
1972 /**
1973 * Read switch port data from storage and write it into a switch object
1974 * @param sw the switch to update
1975 */
1976 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1977 OperatorPredicate op =
1978 new OperatorPredicate(PORT_SWITCH,
1979 OperatorPredicate.Operator.EQ,
1980 sw.getStringId());
1981 IResultSet portResultSet =
1982 storageSource.executeQuery(PORT_TABLE_NAME,
1983 null, op, null);
1984 //Map<Short, OFPhysicalPort> oldports =
1985 // new HashMap<Short, OFPhysicalPort>();
1986 //oldports.putAll(sw.getPorts());
1987
1988 while (portResultSet.next()) {
1989 try {
1990 OFPhysicalPort p = new OFPhysicalPort();
1991 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1992 p.setName(portResultSet.getString(PORT_NAME));
1993 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1994 p.setState((int)portResultSet.getLong(PORT_STATE));
1995 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1996 p.setHardwareAddress(HexString.fromHexString(portMac));
1997 p.setCurrentFeatures((int)portResultSet.
1998 getLong(PORT_CURRENT_FEATURES));
1999 p.setAdvertisedFeatures((int)portResultSet.
2000 getLong(PORT_ADVERTISED_FEATURES));
2001 p.setSupportedFeatures((int)portResultSet.
2002 getLong(PORT_SUPPORTED_FEATURES));
2003 p.setPeerFeatures((int)portResultSet.
2004 getLong(PORT_PEER_FEATURES));
2005 //oldports.remove(Short.valueOf(p.getPortNumber()));
2006 sw.setPort(p);
2007 } catch (NullPointerException e) {
2008 // ignore
2009 }
2010 }
2011 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2012 try {
2013 this.updates.put(update);
2014 } catch (InterruptedException e) {
2015 log.error("Failure adding update to queue", e);
2016 }
2017 }
2018
2019 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2020 if (role == Role.SLAVE) {
2021 return;
2022 }
2023 String datapathIdString = sw.getStringId();
2024 String id = datapathIdString + "|" + portNumber;
2025 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2026 }
2027
2028 /**
2029 * Sets the initial role based on properties in the config params.
2030 * It looks for two different properties.
2031 * If the "role" property is specified then the value should be
2032 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2033 * controller is set to the specified value. If the "role" property
2034 * is not specified then it looks next for the "role.path" property.
2035 * In this case the value should be the path to a property file in
2036 * the file system that contains a property called "floodlight.role"
2037 * which can be one of the values listed above for the "role" property.
2038 * The idea behind the "role.path" mechanism is that you have some
2039 * separate heartbeat and master controller election algorithm that
2040 * determines the role of the controller. When a role transition happens,
2041 * it updates the current role in the file specified by the "role.path"
2042 * file. Then if floodlight restarts for some reason it can get the
2043 * correct current role of the controller from the file.
2044 * @param configParams The config params for the FloodlightProvider service
2045 * @return A valid role if role information is specified in the
2046 * config params, otherwise null
2047 */
2048 @LogMessageDocs({
2049 @LogMessageDoc(message="Controller role set to {role}",
2050 explanation="Setting the initial HA role to "),
2051 @LogMessageDoc(level="ERROR",
2052 message="Invalid current role value: {role}",
2053 explanation="An invalid HA role value was read from the " +
2054 "properties file",
2055 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2056 })
2057 protected Role getInitialRole(Map<String, String> configParams) {
2058 Role role = null;
2059 String roleString = configParams.get("role");
2060 if (roleString == null) {
2061 String rolePath = configParams.get("rolepath");
2062 if (rolePath != null) {
2063 Properties properties = new Properties();
2064 try {
2065 properties.load(new FileInputStream(rolePath));
2066 roleString = properties.getProperty("floodlight.role");
2067 }
2068 catch (IOException exc) {
2069 // Don't treat it as an error if the file specified by the
2070 // rolepath property doesn't exist. This lets us enable the
2071 // HA mechanism by just creating/setting the floodlight.role
2072 // property in that file without having to modify the
2073 // floodlight properties.
2074 }
2075 }
2076 }
2077
2078 if (roleString != null) {
2079 // Canonicalize the string to the form used for the enum constants
2080 roleString = roleString.trim().toUpperCase();
2081 try {
2082 role = Role.valueOf(roleString);
2083 }
2084 catch (IllegalArgumentException exc) {
2085 log.error("Invalid current role value: {}", roleString);
2086 }
2087 }
2088
2089 log.info("Controller role set to {}", role);
2090
2091 return role;
2092 }
2093
2094 /**
2095 * Tell controller that we're ready to accept switches loop
2096 * @throws IOException
2097 */
2098 @LogMessageDocs({
2099 @LogMessageDoc(message="Listening for switch connections on {address}",
2100 explanation="The controller is ready and listening for new" +
2101 " switch connections"),
2102 @LogMessageDoc(message="Storage exception in controller " +
2103 "updates loop; terminating process",
2104 explanation=ERROR_DATABASE,
2105 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2106 @LogMessageDoc(level="ERROR",
2107 message="Exception in controller updates loop",
2108 explanation="Failed to dispatch controller event",
2109 recommendation=LogMessageDoc.GENERIC_ACTION)
2110 })
2111 public void run() {
2112 if (log.isDebugEnabled()) {
2113 logListeners();
2114 }
2115
2116 try {
2117 final ServerBootstrap bootstrap = createServerBootStrap();
2118
2119 bootstrap.setOption("reuseAddr", true);
2120 bootstrap.setOption("child.keepAlive", true);
2121 bootstrap.setOption("child.tcpNoDelay", true);
2122 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2123
2124 ChannelPipelineFactory pfact =
2125 new OpenflowPipelineFactory(this, null);
2126 bootstrap.setPipelineFactory(pfact);
2127 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2128 final ChannelGroup cg = new DefaultChannelGroup();
2129 cg.add(bootstrap.bind(sa));
2130
2131 log.info("Listening for switch connections on {}", sa);
2132 } catch (Exception e) {
2133 throw new RuntimeException(e);
2134 }
2135
2136 // main loop
2137 while (true) {
2138 try {
2139 IUpdate update = updates.take();
2140 update.dispatch();
2141 } catch (InterruptedException e) {
2142 return;
2143 } catch (StorageException e) {
2144 log.error("Storage exception in controller " +
2145 "updates loop; terminating process", e);
2146 return;
2147 } catch (Exception e) {
2148 log.error("Exception in controller updates loop", e);
2149 }
2150 }
2151 }
2152
2153 private ServerBootstrap createServerBootStrap() {
2154 if (workerThreads == 0) {
2155 return new ServerBootstrap(
2156 new NioServerSocketChannelFactory(
2157 Executors.newCachedThreadPool(),
2158 Executors.newCachedThreadPool()));
2159 } else {
2160 return new ServerBootstrap(
2161 new NioServerSocketChannelFactory(
2162 Executors.newCachedThreadPool(),
2163 Executors.newCachedThreadPool(), workerThreads));
2164 }
2165 }
2166
2167 public void setConfigParams(Map<String, String> configParams) {
2168 String ofPort = configParams.get("openflowport");
2169 if (ofPort != null) {
2170 this.openFlowPort = Integer.parseInt(ofPort);
2171 }
2172 log.debug("OpenFlow port set to {}", this.openFlowPort);
2173 String threads = configParams.get("workerthreads");
2174 if (threads != null) {
2175 this.workerThreads = Integer.parseInt(threads);
2176 }
2177 log.debug("Number of worker threads set to {}", this.workerThreads);
2178 String controllerId = configParams.get("controllerid");
2179 if (controllerId != null) {
2180 this.controllerId = controllerId;
2181 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002182 else {
2183 //Try to get the hostname of the machine and use that for controller ID
2184 try {
2185 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2186 this.controllerId = hostname;
2187 } catch (UnknownHostException e) {
2188 // Can't get hostname, we'll just use the default
2189 }
2190 }
2191
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002192 log.debug("ControllerId set to {}", this.controllerId);
2193 }
2194
2195 private void initVendorMessages() {
2196 // Configure openflowj to be able to parse the role request/reply
2197 // vendor messages.
2198 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2199 OFNiciraVendorData.NX_VENDOR_ID, 4);
2200 OFVendorId.registerVendorId(niciraVendorId);
2201 OFBasicVendorDataType roleRequestVendorData =
2202 new OFBasicVendorDataType(
2203 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2204 OFRoleRequestVendorData.getInstantiable());
2205 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2206 OFBasicVendorDataType roleReplyVendorData =
2207 new OFBasicVendorDataType(
2208 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2209 OFRoleReplyVendorData.getInstantiable());
2210 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2211 }
2212
2213 /**
2214 * Initialize internal data structures
2215 */
2216 public void init(Map<String, String> configParams) {
2217 // These data structures are initialized here because other
2218 // module's startUp() might be called before ours
2219 this.messageListeners =
2220 new ConcurrentHashMap<OFType,
2221 ListenerDispatcher<OFType,
2222 IOFMessageListener>>();
2223 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2224 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2225 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2226 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2227 this.controllerNodeIPsCache = new HashMap<String, String>();
2228 this.updates = new LinkedBlockingQueue<IUpdate>();
2229 this.factory = new BasicFactory();
2230 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2231 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002232 //this.role = getInitialRole(configParams);
2233 //Set the controller's role to MASTER so it always tries to do role requests.
2234 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002235 this.roleChanger = new RoleChanger();
2236 initVendorMessages();
2237 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002238 }
2239
2240 /**
2241 * Startup all of the controller's components
2242 */
2243 @LogMessageDoc(message="Waiting for storage source",
2244 explanation="The system database is not yet ready",
2245 recommendation="If this message persists, this indicates " +
2246 "that the system database has failed to start. " +
2247 LogMessageDoc.CHECK_CONTROLLER)
2248 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002249 try {
2250 registryService.registerController(controllerId);
2251 } catch (RegistryException e2) {
2252 log.warn("Registry service error: {}", e2.getMessage());
2253 }
2254
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002255 // Create the table names we use
2256 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2257 storageSource.createTable(SWITCH_TABLE_NAME, null);
2258 storageSource.createTable(PORT_TABLE_NAME, null);
2259 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2260 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2261 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2262 CONTROLLER_ID);
2263 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2264 SWITCH_DATAPATH_ID);
2265 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2266 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2267 CONTROLLER_INTERFACE_ID);
2268 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2269
2270 while (true) {
2271 try {
2272 updateControllerInfo();
2273 break;
2274 }
2275 catch (StorageException e) {
2276 log.info("Waiting for storage source");
2277 try {
2278 Thread.sleep(1000);
2279 } catch (InterruptedException e1) {
2280 }
2281 }
2282 }
2283
2284 // Add our REST API
2285 restApi.addRestletRoutable(new CoreWebRoutable());
2286 }
2287
2288 @Override
2289 public void addInfoProvider(String type, IInfoProvider provider) {
2290 if (!providerMap.containsKey(type)) {
2291 providerMap.put(type, new ArrayList<IInfoProvider>());
2292 }
2293 providerMap.get(type).add(provider);
2294 }
2295
2296 @Override
2297 public void removeInfoProvider(String type, IInfoProvider provider) {
2298 if (!providerMap.containsKey(type)) {
2299 log.debug("Provider type {} doesn't exist.", type);
2300 return;
2301 }
2302
2303 providerMap.get(type).remove(provider);
2304 }
2305
2306 public Map<String, Object> getControllerInfo(String type) {
2307 if (!providerMap.containsKey(type)) return null;
2308
2309 Map<String, Object> result = new LinkedHashMap<String, Object>();
2310 for (IInfoProvider provider : providerMap.get(type)) {
2311 result.putAll(provider.getInfo(type));
2312 }
2313
2314 return result;
2315 }
2316
2317 @Override
2318 public void addHAListener(IHAListener listener) {
2319 this.haListeners.add(listener);
2320 }
2321
2322 @Override
2323 public void removeHAListener(IHAListener listener) {
2324 this.haListeners.remove(listener);
2325 }
2326
2327
2328 /**
2329 * Handle changes to the controller nodes IPs and dispatch update.
2330 */
2331 @SuppressWarnings("unchecked")
2332 protected void handleControllerNodeIPChanges() {
2333 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2334 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2335 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2336 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2337 CONTROLLER_INTERFACE_TYPE,
2338 CONTROLLER_INTERFACE_NUMBER,
2339 CONTROLLER_INTERFACE_DISCOVERED_IP };
2340 synchronized(controllerNodeIPsCache) {
2341 // We currently assume that interface Ethernet0 is the relevant
2342 // controller interface. Might change.
2343 // We could (should?) implement this using
2344 // predicates, but creating the individual and compound predicate
2345 // seems more overhead then just checking every row. Particularly,
2346 // since the number of rows is small and changes infrequent
2347 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2348 colNames,null, null);
2349 while (res.next()) {
2350 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2351 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2352 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2353 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2354 String curIP = controllerNodeIPsCache.get(controllerID);
2355
2356 curControllerNodeIPs.put(controllerID, discoveredIP);
2357 if (curIP == null) {
2358 // new controller node IP
2359 addedControllerNodeIPs.put(controllerID, discoveredIP);
2360 }
2361 else if (curIP != discoveredIP) {
2362 // IP changed
2363 removedControllerNodeIPs.put(controllerID, curIP);
2364 addedControllerNodeIPs.put(controllerID, discoveredIP);
2365 }
2366 }
2367 }
2368 // Now figure out if rows have been deleted. We can't use the
2369 // rowKeys from rowsDeleted directly, since the tables primary
2370 // key is a compound that we can't disassemble
2371 Set<String> curEntries = curControllerNodeIPs.keySet();
2372 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2373 removedEntries.removeAll(curEntries);
2374 for (String removedControllerID : removedEntries)
2375 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2376 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2377 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2378 curControllerNodeIPs, addedControllerNodeIPs,
2379 removedControllerNodeIPs);
2380 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2381 try {
2382 this.updates.put(update);
2383 } catch (InterruptedException e) {
2384 log.error("Failure adding update to queue", e);
2385 }
2386 }
2387 }
2388 }
2389
2390 @Override
2391 public Map<String, String> getControllerNodeIPs() {
2392 // We return a copy of the mapping so we can guarantee that
2393 // the mapping return is the same as one that will be (or was)
2394 // dispatched to IHAListeners
2395 HashMap<String,String> retval = new HashMap<String,String>();
2396 synchronized(controllerNodeIPsCache) {
2397 retval.putAll(controllerNodeIPsCache);
2398 }
2399 return retval;
2400 }
2401
2402 @Override
2403 public void rowsModified(String tableName, Set<Object> rowKeys) {
2404 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2405 handleControllerNodeIPChanges();
2406 }
2407
2408 }
2409
2410 @Override
2411 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2412 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2413 handleControllerNodeIPChanges();
2414 }
2415 }
2416
2417 @Override
2418 public long getSystemStartTime() {
2419 return (this.systemStartTime);
2420 }
2421
2422 @Override
2423 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2424 this.alwaysClearFlowsOnSwAdd = value;
2425 }
2426
2427 public boolean getAlwaysClearFlowsOnSwAdd() {
2428 return this.alwaysClearFlowsOnSwAdd;
2429 }
2430}