blob: 9d4ac9a3c3e6955daea9b9e8c881f8cf0d93cd8d [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berdedc73bb12013-08-14 13:46:38 -070061import net.floodlightcontroller.core.IUpdate;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
68import net.floodlightcontroller.packet.Ethernet;
69import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
70import net.floodlightcontroller.restserver.IRestApiService;
71import net.floodlightcontroller.storage.IResultSet;
72import net.floodlightcontroller.storage.IStorageSourceListener;
73import net.floodlightcontroller.storage.IStorageSourceService;
74import net.floodlightcontroller.storage.OperatorPredicate;
75import net.floodlightcontroller.storage.StorageException;
76import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070077import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080078import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080079import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080080import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080081
82import org.jboss.netty.bootstrap.ServerBootstrap;
83import org.jboss.netty.buffer.ChannelBuffer;
84import org.jboss.netty.buffer.ChannelBuffers;
85import org.jboss.netty.channel.Channel;
86import org.jboss.netty.channel.ChannelHandlerContext;
87import org.jboss.netty.channel.ChannelPipelineFactory;
88import org.jboss.netty.channel.ChannelStateEvent;
89import org.jboss.netty.channel.ChannelUpstreamHandler;
90import org.jboss.netty.channel.Channels;
91import org.jboss.netty.channel.ExceptionEvent;
92import org.jboss.netty.channel.MessageEvent;
93import org.jboss.netty.channel.group.ChannelGroup;
94import org.jboss.netty.channel.group.DefaultChannelGroup;
95import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
96import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
97import org.jboss.netty.handler.timeout.IdleStateEvent;
98import org.jboss.netty.handler.timeout.ReadTimeoutException;
99import org.openflow.protocol.OFEchoReply;
100import org.openflow.protocol.OFError;
101import org.openflow.protocol.OFError.OFBadActionCode;
102import org.openflow.protocol.OFError.OFBadRequestCode;
103import org.openflow.protocol.OFError.OFErrorType;
104import org.openflow.protocol.OFError.OFFlowModFailedCode;
105import org.openflow.protocol.OFError.OFHelloFailedCode;
106import org.openflow.protocol.OFError.OFPortModFailedCode;
107import org.openflow.protocol.OFError.OFQueueOpFailedCode;
108import org.openflow.protocol.OFFeaturesReply;
109import org.openflow.protocol.OFGetConfigReply;
110import org.openflow.protocol.OFMessage;
111import org.openflow.protocol.OFPacketIn;
112import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800113import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800115import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800116import org.openflow.protocol.OFPortStatus.OFPortReason;
117import org.openflow.protocol.OFSetConfig;
118import org.openflow.protocol.OFStatisticsRequest;
119import org.openflow.protocol.OFSwitchConfig;
120import org.openflow.protocol.OFType;
121import org.openflow.protocol.OFVendor;
122import org.openflow.protocol.factory.BasicFactory;
123import org.openflow.protocol.factory.MessageParseException;
124import org.openflow.protocol.statistics.OFDescriptionStatistics;
125import org.openflow.protocol.statistics.OFStatistics;
126import org.openflow.protocol.statistics.OFStatisticsType;
127import org.openflow.protocol.vendor.OFBasicVendorDataType;
128import org.openflow.protocol.vendor.OFBasicVendorId;
129import org.openflow.protocol.vendor.OFVendorId;
130import org.openflow.util.HexString;
131import org.openflow.util.U16;
132import org.openflow.util.U32;
133import org.openflow.vendor.nicira.OFNiciraVendorData;
134import org.openflow.vendor.nicira.OFRoleReplyVendorData;
135import org.openflow.vendor.nicira.OFRoleRequestVendorData;
136import org.openflow.vendor.nicira.OFRoleVendorData;
137import org.slf4j.Logger;
138import org.slf4j.LoggerFactory;
139
140
141/**
142 * The main controller class. Handles all setup and network listeners
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700143 *
144 * Extensions made by ONOS are:
145 * - Detailed Port event: PORTCHANGED -> {PORTCHANGED, PORTADDED, PORTREMOVED}
146 * Available as net.onrc.onos.ofcontroller.core.IOFSwitchPortListener
147 * - Distributed ownership control of switch through RegistryService(IControllerRegistryService)
Pavlin Radoslavova653e9f2013-10-16 03:08:52 -0700148 * - Register ONOS services. (IControllerRegistryService)
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700149 * - Additional DEBUG logs
150 * - Try using hostname as controller ID, when ID was not explicitly given.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800151 */
152public class Controller implements IFloodlightProviderService,
153 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700154
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800155 protected static Logger log = LoggerFactory.getLogger(Controller.class);
156
157 private static final String ERROR_DATABASE =
158 "The controller could not communicate with the system database.";
159
160 protected BasicFactory factory;
161 protected ConcurrentMap<OFType,
162 ListenerDispatcher<OFType,IOFMessageListener>>
163 messageListeners;
164 // The activeSwitches map contains only those switches that are actively
165 // being controlled by us -- it doesn't contain switches that are
166 // in the slave role
167 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
168 // connectedSwitches contains all connected switches, including ones where
169 // we're a slave controller. We need to keep track of them so that we can
170 // send role request messages to switches when our role changes to master
171 // We add a switch to this set after it successfully completes the
172 // handshake. Access to this Set needs to be synchronized with roleChanger
173 protected HashSet<OFSwitchImpl> connectedSwitches;
174
175 // The controllerNodeIPsCache maps Controller IDs to their IP address.
176 // It's only used by handleControllerNodeIPsChanged
177 protected HashMap<String, String> controllerNodeIPsCache;
178
179 protected Set<IOFSwitchListener> switchListeners;
180 protected Set<IHAListener> haListeners;
181 protected Map<String, List<IInfoProvider>> providerMap;
182 protected BlockingQueue<IUpdate> updates;
183
184 // Module dependencies
185 protected IRestApiService restApi;
186 protected ICounterStoreService counterStore = null;
187 protected IStorageSourceService storageSource;
188 protected IPktInProcessingTimeService pktinProcTime;
189 protected IThreadPoolService threadPool;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800190 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800191
192 // Configuration options
193 protected int openFlowPort = 6633;
194 protected int workerThreads = 0;
195 // The id for this controller node. Should be unique for each controller
196 // node in a controller cluster.
197 protected String controllerId = "localhost";
198
199 // The current role of the controller.
200 // If the controller isn't configured to support roles, then this is null.
201 protected Role role;
202 // A helper that handles sending and timeout handling for role requests
203 protected RoleChanger roleChanger;
204
205 // Start time of the controller
206 protected long systemStartTime;
207
208 // Flag to always flush flow table on switch reconnect (HA or otherwise)
209 protected boolean alwaysClearFlowsOnSwAdd = false;
210
211 // Storage table names
212 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
213 protected static final String CONTROLLER_ID = "id";
214
215 protected static final String SWITCH_TABLE_NAME = "controller_switch";
216 protected static final String SWITCH_DATAPATH_ID = "dpid";
217 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
218 protected static final String SWITCH_IP = "ip";
219 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
220 protected static final String SWITCH_ACTIVE = "active";
221 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
222 protected static final String SWITCH_CAPABILITIES = "capabilities";
223 protected static final String SWITCH_BUFFERS = "buffers";
224 protected static final String SWITCH_TABLES = "tables";
225 protected static final String SWITCH_ACTIONS = "actions";
226
227 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
228 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
229
230 protected static final String PORT_TABLE_NAME = "controller_port";
231 protected static final String PORT_ID = "id";
232 protected static final String PORT_SWITCH = "switch_id";
233 protected static final String PORT_NUMBER = "number";
234 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
235 protected static final String PORT_NAME = "name";
236 protected static final String PORT_CONFIG = "config";
237 protected static final String PORT_STATE = "state";
238 protected static final String PORT_CURRENT_FEATURES = "current_features";
239 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
240 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
241 protected static final String PORT_PEER_FEATURES = "peer_features";
242
243 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
244 protected static final String CONTROLLER_INTERFACE_ID = "id";
245 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
246 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
247 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
248 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
249
250
251
252 // Perf. related configuration
253 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
254 protected static final int BATCH_MAX_SIZE = 100;
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700255 protected static final boolean ALWAYS_DECODE_ETH = true;
256
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800257 public enum SwitchUpdateType {
258 ADDED,
259 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700260 PORTCHANGED,
261 PORTADDED,
262 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800263 }
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700264
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800265 /**
266 * Update message indicating a switch was added or removed
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700267 * ONOS: This message extended to indicate Port add or removed event.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800268 */
269 protected class SwitchUpdate implements IUpdate {
270 public IOFSwitch sw;
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700271 public OFPhysicalPort port; // Added by ONOS
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800272 public SwitchUpdateType switchUpdateType;
273 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
274 this.sw = sw;
275 this.switchUpdateType = switchUpdateType;
276 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700277 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
278 this.sw = sw;
279 this.port = port;
280 this.switchUpdateType = switchUpdateType;
281 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800282 public void dispatch() {
283 if (log.isTraceEnabled()) {
284 log.trace("Dispatching switch update {} {}",
285 sw, switchUpdateType);
286 }
287 if (switchListeners != null) {
288 for (IOFSwitchListener listener : switchListeners) {
289 switch(switchUpdateType) {
290 case ADDED:
291 listener.addedSwitch(sw);
292 break;
293 case REMOVED:
294 listener.removedSwitch(sw);
295 break;
296 case PORTCHANGED:
297 listener.switchPortChanged(sw.getId());
298 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700299 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700300 if (listener instanceof IOFSwitchPortListener) {
301 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
302 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700303 break;
304 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700305 if (listener instanceof IOFSwitchPortListener) {
306 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
307 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700308 break;
309 default:
310 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800311 }
312 }
313 }
314 }
315 }
316
317 /**
318 * Update message indicating controller's role has changed
319 */
320 protected class HARoleUpdate implements IUpdate {
321 public Role oldRole;
322 public Role newRole;
323 public HARoleUpdate(Role newRole, Role oldRole) {
324 this.oldRole = oldRole;
325 this.newRole = newRole;
326 }
327 public void dispatch() {
328 // Make sure that old and new roles are different.
329 if (oldRole == newRole) {
330 if (log.isTraceEnabled()) {
331 log.trace("HA role update ignored as the old and " +
332 "new roles are the same. newRole = {}" +
333 "oldRole = {}", newRole, oldRole);
334 }
335 return;
336 }
337 if (log.isTraceEnabled()) {
338 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
339 newRole, oldRole);
340 }
341 if (haListeners != null) {
342 for (IHAListener listener : haListeners) {
343 listener.roleChanged(oldRole, newRole);
344 }
345 }
346 }
347 }
348
349 /**
350 * Update message indicating
351 * IPs of controllers in controller cluster have changed.
352 */
353 protected class HAControllerNodeIPUpdate implements IUpdate {
354 public Map<String,String> curControllerNodeIPs;
355 public Map<String,String> addedControllerNodeIPs;
356 public Map<String,String> removedControllerNodeIPs;
357 public HAControllerNodeIPUpdate(
358 HashMap<String,String> curControllerNodeIPs,
359 HashMap<String,String> addedControllerNodeIPs,
360 HashMap<String,String> removedControllerNodeIPs) {
361 this.curControllerNodeIPs = curControllerNodeIPs;
362 this.addedControllerNodeIPs = addedControllerNodeIPs;
363 this.removedControllerNodeIPs = removedControllerNodeIPs;
364 }
365 public void dispatch() {
366 if (log.isTraceEnabled()) {
367 log.trace("Dispatching HA Controller Node IP update "
368 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
369 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
370 removedControllerNodeIPs }
371 );
372 }
373 if (haListeners != null) {
374 for (IHAListener listener: haListeners) {
375 listener.controllerNodeIPsChanged(curControllerNodeIPs,
376 addedControllerNodeIPs, removedControllerNodeIPs);
377 }
378 }
379 }
380 }
381
382 // ***************
383 // Getters/Setters
384 // ***************
385
386 public void setStorageSourceService(IStorageSourceService storageSource) {
387 this.storageSource = storageSource;
388 }
389
390 public void setCounterStore(ICounterStoreService counterStore) {
391 this.counterStore = counterStore;
392 }
393
394 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
395 this.pktinProcTime = pits;
396 }
397
398 public void setRestApiService(IRestApiService restApi) {
399 this.restApi = restApi;
400 }
401
402 public void setThreadPoolService(IThreadPoolService tp) {
403 this.threadPool = tp;
404 }
405
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800406 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800407 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800408 }
409
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800410 @Override
411 public Role getRole() {
412 synchronized(roleChanger) {
413 return role;
414 }
415 }
416
417 @Override
418 public void setRole(Role role) {
419 if (role == null) throw new NullPointerException("Role can not be null.");
420 if (role == Role.MASTER && this.role == Role.SLAVE) {
421 // Reset db state to Inactive for all switches.
422 updateAllInactiveSwitchInfo();
423 }
424
425 // Need to synchronize to ensure a reliable ordering on role request
426 // messages send and to ensure the list of connected switches is stable
427 // RoleChanger will handle the actual sending of the message and
428 // timeout handling
429 // @see RoleChanger
430 synchronized(roleChanger) {
431 if (role.equals(this.role)) {
432 log.debug("Ignoring role change: role is already {}", role);
433 return;
434 }
435
436 Role oldRole = this.role;
437 this.role = role;
438
439 log.debug("Submitting role change request to role {}", role);
440 roleChanger.submitRequest(connectedSwitches, role);
441
442 // Enqueue an update for our listeners.
443 try {
444 this.updates.put(new HARoleUpdate(role, oldRole));
445 } catch (InterruptedException e) {
446 log.error("Failure adding update to queue", e);
447 }
448 }
449 }
450
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700451 public void publishUpdate(IUpdate update) {
452 try {
453 this.updates.put(update);
454 } catch (InterruptedException e) {
455 log.error("Failure adding update to queue", e);
456 }
457 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800458
459 // **********************
460 // ChannelUpstreamHandler
461 // **********************
462
463 /**
464 * Return a new channel handler for processing a switch connections
465 * @param state The channel state object for the connection
466 * @return the new channel handler
467 */
468 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
469 return new OFChannelHandler(state);
470 }
471
Jonathan Hartcc957a02013-02-26 10:39:04 -0800472 protected class RoleChangeCallback implements ControlChangeCallback {
473 @Override
474 public void controlChanged(long dpid, boolean hasControl) {
475 log.info("Role change callback for switch {}, hasControl {}",
476 HexString.toHexString(dpid), hasControl);
477
478 synchronized(roleChanger){
479 OFSwitchImpl sw = null;
480 for (OFSwitchImpl connectedSw : connectedSwitches){
481 if (connectedSw.getId() == dpid){
482 sw = connectedSw;
483 break;
484 }
485 }
486 if (sw == null){
487 log.warn("Switch {} not found in connected switches",
488 HexString.toHexString(dpid));
489 return;
490 }
491
492 Role role = null;
493
Pankaj Berde01939e92013-03-08 14:38:27 -0800494 /*
495 * issue #229
496 * Cannot rely on sw.getRole() as it can be behind due to pending
497 * role changes in the queue. Just submit it and late the RoleChanger
498 * handle duplicates.
499 */
500
501 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800502 role = Role.MASTER;
503 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800504 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800505 role = Role.SLAVE;
506 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800507
508 log.debug("Sending role request {} msg to {}", role, sw);
509 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
510 swList.add(sw);
511 roleChanger.submitRequest(swList, role);
512
Jonathan Hartcc957a02013-02-26 10:39:04 -0800513 }
514
515 }
516 }
517
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800518 /**
519 * Channel handler deals with the switch connection and dispatches
520 * switch messages to the appropriate locations.
521 * @author readams
522 */
523 protected class OFChannelHandler
524 extends IdleStateAwareChannelUpstreamHandler {
525 protected OFSwitchImpl sw;
526 protected OFChannelState state;
527
528 public OFChannelHandler(OFChannelState state) {
529 this.state = state;
530 }
531
532 @Override
533 @LogMessageDoc(message="New switch connection from {ip address}",
534 explanation="A new switch has connected from the " +
535 "specified IP address")
536 public void channelConnected(ChannelHandlerContext ctx,
537 ChannelStateEvent e) throws Exception {
538 log.info("New switch connection from {}",
539 e.getChannel().getRemoteAddress());
540
541 sw = new OFSwitchImpl();
542 sw.setChannel(e.getChannel());
543 sw.setFloodlightProvider(Controller.this);
544 sw.setThreadPoolService(threadPool);
545
546 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
547 msglist.add(factory.getMessage(OFType.HELLO));
548 e.getChannel().write(msglist);
549
550 }
551
552 @Override
553 @LogMessageDoc(message="Disconnected switch {switch information}",
554 explanation="The specified switch has disconnected.")
555 public void channelDisconnected(ChannelHandlerContext ctx,
556 ChannelStateEvent e) throws Exception {
557 if (sw != null && state.hsState == HandshakeState.READY) {
558 if (activeSwitches.containsKey(sw.getId())) {
559 // It's safe to call removeSwitch even though the map might
560 // not contain this particular switch but another with the
561 // same DPID
562 removeSwitch(sw);
563 }
564 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700565 if (controlRequested) {
566 registryService.releaseControl(sw.getId());
567 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800568 connectedSwitches.remove(sw);
569 }
570 sw.setConnected(false);
571 }
572 log.info("Disconnected switch {}", sw);
573 }
574
575 @Override
576 @LogMessageDocs({
577 @LogMessageDoc(level="ERROR",
578 message="Disconnecting switch {switch} due to read timeout",
579 explanation="The connected switch has failed to send any " +
580 "messages or respond to echo requests",
581 recommendation=LogMessageDoc.CHECK_SWITCH),
582 @LogMessageDoc(level="ERROR",
583 message="Disconnecting switch {switch}: failed to " +
584 "complete handshake",
585 explanation="The switch did not respond correctly " +
586 "to handshake messages",
587 recommendation=LogMessageDoc.CHECK_SWITCH),
588 @LogMessageDoc(level="ERROR",
589 message="Disconnecting switch {switch} due to IO Error: {}",
590 explanation="There was an error communicating with the switch",
591 recommendation=LogMessageDoc.CHECK_SWITCH),
592 @LogMessageDoc(level="ERROR",
593 message="Disconnecting switch {switch} due to switch " +
594 "state error: {error}",
595 explanation="The switch sent an unexpected message",
596 recommendation=LogMessageDoc.CHECK_SWITCH),
597 @LogMessageDoc(level="ERROR",
598 message="Disconnecting switch {switch} due to " +
599 "message parse failure",
600 explanation="Could not parse a message from the switch",
601 recommendation=LogMessageDoc.CHECK_SWITCH),
602 @LogMessageDoc(level="ERROR",
603 message="Terminating controller due to storage exception",
604 explanation=ERROR_DATABASE,
605 recommendation=LogMessageDoc.CHECK_CONTROLLER),
606 @LogMessageDoc(level="ERROR",
607 message="Could not process message: queue full",
608 explanation="OpenFlow messages are arriving faster than " +
609 " the controller can process them.",
610 recommendation=LogMessageDoc.CHECK_CONTROLLER),
611 @LogMessageDoc(level="ERROR",
612 message="Error while processing message " +
613 "from switch {switch} {cause}",
614 explanation="An error occurred processing the switch message",
615 recommendation=LogMessageDoc.GENERIC_ACTION)
616 })
617 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
618 throws Exception {
619 if (e.getCause() instanceof ReadTimeoutException) {
620 // switch timeout
621 log.error("Disconnecting switch {} due to read timeout", sw);
622 ctx.getChannel().close();
623 } else if (e.getCause() instanceof HandshakeTimeoutException) {
624 log.error("Disconnecting switch {}: failed to complete handshake",
625 sw);
626 ctx.getChannel().close();
627 } else if (e.getCause() instanceof ClosedChannelException) {
628 //log.warn("Channel for sw {} already closed", sw);
629 } else if (e.getCause() instanceof IOException) {
630 log.error("Disconnecting switch {} due to IO Error: {}",
631 sw, e.getCause().getMessage());
632 ctx.getChannel().close();
633 } else if (e.getCause() instanceof SwitchStateException) {
634 log.error("Disconnecting switch {} due to switch state error: {}",
635 sw, e.getCause().getMessage());
636 ctx.getChannel().close();
637 } else if (e.getCause() instanceof MessageParseException) {
638 log.error("Disconnecting switch " + sw +
639 " due to message parse failure",
640 e.getCause());
641 ctx.getChannel().close();
642 } else if (e.getCause() instanceof StorageException) {
643 log.error("Terminating controller due to storage exception",
644 e.getCause());
645 terminate();
646 } else if (e.getCause() instanceof RejectedExecutionException) {
647 log.warn("Could not process message: queue full");
648 } else {
649 log.error("Error while processing message from switch " + sw,
650 e.getCause());
651 ctx.getChannel().close();
652 }
653 }
654
655 @Override
656 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
657 throws Exception {
658 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
659 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
660 e.getChannel().write(msglist);
661 }
662
663 @Override
664 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
665 throws Exception {
666 if (e.getMessage() instanceof List) {
667 @SuppressWarnings("unchecked")
668 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
669
670 for (OFMessage ofm : msglist) {
671 try {
672 processOFMessage(ofm);
673 }
674 catch (Exception ex) {
675 // We are the last handler in the stream, so run the
676 // exception through the channel again by passing in
677 // ctx.getChannel().
678 Channels.fireExceptionCaught(ctx.getChannel(), ex);
679 }
680 }
681
682 // Flush all flow-mods/packet-out generated from this "train"
683 OFSwitchImpl.flush_all();
684 }
685 }
686
687 /**
688 * Process the request for the switch description
689 */
690 @LogMessageDoc(level="ERROR",
691 message="Exception in reading description " +
692 " during handshake {exception}",
693 explanation="Could not process the switch description string",
694 recommendation=LogMessageDoc.CHECK_SWITCH)
695 void processSwitchDescReply() {
696 try {
697 // Read description, if it has been updated
698 @SuppressWarnings("unchecked")
699 Future<List<OFStatistics>> desc_future =
700 (Future<List<OFStatistics>>)sw.
701 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
702 List<OFStatistics> values =
703 desc_future.get(0, TimeUnit.MILLISECONDS);
704 if (values != null) {
705 OFDescriptionStatistics description =
706 new OFDescriptionStatistics();
707 ChannelBuffer data =
708 ChannelBuffers.buffer(description.getLength());
709 for (OFStatistics f : values) {
710 f.writeTo(data);
711 description.readFrom(data);
712 break; // SHOULD be a list of length 1
713 }
714 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
715 description);
716 sw.setSwitchProperties(description);
717 data = null;
718
719 // At this time, also set other switch properties from storage
720 boolean is_core_switch = false;
721 IResultSet resultSet = null;
722 try {
723 String swid = sw.getStringId();
724 resultSet =
725 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
726 for (Iterator<IResultSet> it =
727 resultSet.iterator(); it.hasNext();) {
728 // In case of multiple rows, use the status
729 // in last row?
730 Map<String, Object> row = it.next().getRow();
731 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
732 if (log.isDebugEnabled()) {
733 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
734 "config for switch={}, is-core={}",
735 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
736 }
737 String ics =
738 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
739 is_core_switch = ics.equals("true");
740 }
741 }
742 }
743 finally {
744 if (resultSet != null)
745 resultSet.close();
746 }
747 if (is_core_switch) {
748 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
749 new Boolean(true));
750 }
751 }
752 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
753 state.hasDescription = true;
754 checkSwitchReady();
755 }
756 catch (InterruptedException ex) {
757 // Ignore
758 }
759 catch (TimeoutException ex) {
760 // Ignore
761 } catch (Exception ex) {
762 log.error("Exception in reading description " +
763 " during handshake", ex);
764 }
765 }
766
767 /**
768 * Send initial switch setup information that we need before adding
769 * the switch
770 * @throws IOException
771 */
772 void sendHelloConfiguration() throws IOException {
773 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700774 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800775 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
776 }
777
778 /**
779 * Send the configuration requests we can only do after we have
780 * the features reply
781 * @throws IOException
782 */
783 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700784 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800785 // Ensure we receive the full packet via PacketIn
786 OFSetConfig config = (OFSetConfig) factory
787 .getMessage(OFType.SET_CONFIG);
788 config.setMissSendLength((short) 0xffff)
789 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
790 sw.write(config, null);
791 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
792 null);
793
794 // Get Description to set switch-specific flags
795 OFStatisticsRequest req = new OFStatisticsRequest();
796 req.setStatisticType(OFStatisticsType.DESC);
797 req.setLengthU(req.getLengthU());
798 Future<List<OFStatistics>> dfuture =
799 sw.getStatistics(req);
800 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
801 dfuture);
802
803 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700804
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700805 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800806 protected void checkSwitchReady() {
807 if (state.hsState == HandshakeState.FEATURES_REPLY &&
808 state.hasDescription && state.hasGetConfigReply) {
809
810 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700811 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800812
813 synchronized(roleChanger) {
814 // We need to keep track of all of the switches that are connected
815 // to the controller, in any role, so that we can later send the
816 // role request messages when the controller role changes.
817 // We need to be synchronized while doing this: we must not
818 // send a another role request to the connectedSwitches until
819 // we were able to add this new switch to connectedSwitches
820 // *and* send the current role to the new switch.
821 connectedSwitches.add(sw);
822
823 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800824 //Put the switch in SLAVE mode until we know we have control
825 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
826 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
827 swList.add(sw);
828 roleChanger.submitRequest(swList, Role.SLAVE);
829
Jonathan Hartcc957a02013-02-26 10:39:04 -0800830 //Request control of the switch from the global registry
831 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700832 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800833 registryService.requestControl(sw.getId(),
834 new RoleChangeCallback());
835 } catch (RegistryException e) {
836 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700837 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800838 }
839
Jonathan Hart97801ac2013-02-26 14:29:16 -0800840
Jonathan Hartcc957a02013-02-26 10:39:04 -0800841
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800842 // Send a role request if role support is enabled for the controller
843 // This is a probe that we'll use to determine if the switch
844 // actually supports the role request message. If it does we'll
845 // get back a role reply message. If it doesn't we'll get back an
846 // OFError message.
847 // If role is MASTER we will promote switch to active
848 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800849 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800850 log.debug("This controller's role is {}, " +
851 "sending initial role request msg to {}",
852 role, sw);
853 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
854 swList.add(sw);
855 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800856 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800857 }
858 else {
859 // Role supported not enabled on controller (for now)
860 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800861 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800862 "not sending role request msg to {}",
863 role, sw);
864 // Need to clear FlowMods before we add the switch
865 // and dispatch updates otherwise we have a race condition.
866 sw.clearAllFlowMods();
867 addSwitch(sw);
868 state.firstRoleReplyReceived = true;
869 }
870 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700871 if (!controlRequested) {
872 // yield to allow other thread(s) to release control
873 try {
874 Thread.sleep(10);
875 } catch (InterruptedException e) {
876 // Ignore interruptions
877 }
878 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700879 log.debug("Closing {} because we weren't able to request control " +
880 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700881 sw.channel.close();
882 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800883 }
884 }
885
886 /* Handle a role reply message we received from the switch. Since
887 * netty serializes message dispatch we don't need to synchronize
888 * against other receive operations from the same switch, so no need
889 * to synchronize addSwitch(), removeSwitch() operations from the same
890 * connection.
891 * FIXME: However, when a switch with the same DPID connects we do
892 * need some synchronization. However, handling switches with same
893 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
894 * removedSwitch notification):1
895 *
896 */
897 @LogMessageDoc(level="ERROR",
898 message="Invalid role value in role reply message",
899 explanation="Was unable to set the HA role (master or slave) " +
900 "for the controller.",
901 recommendation=LogMessageDoc.CHECK_CONTROLLER)
902 protected void handleRoleReplyMessage(OFVendor vendorMessage,
903 OFRoleReplyVendorData roleReplyVendorData) {
904 // Map from the role code in the message to our role enum
905 int nxRole = roleReplyVendorData.getRole();
906 Role role = null;
907 switch (nxRole) {
908 case OFRoleVendorData.NX_ROLE_OTHER:
909 role = Role.EQUAL;
910 break;
911 case OFRoleVendorData.NX_ROLE_MASTER:
912 role = Role.MASTER;
913 break;
914 case OFRoleVendorData.NX_ROLE_SLAVE:
915 role = Role.SLAVE;
916 break;
917 default:
918 log.error("Invalid role value in role reply message");
919 sw.getChannel().close();
920 return;
921 }
922
923 log.debug("Handling role reply for role {} from {}. " +
924 "Controller's role is {} ",
925 new Object[] { role, sw, Controller.this.role}
926 );
927
928 sw.deliverRoleReply(vendorMessage.getXid(), role);
929
930 boolean isActive = activeSwitches.containsKey(sw.getId());
931 if (!isActive && sw.isActive()) {
932 // Transition from SLAVE to MASTER.
933
934 if (!state.firstRoleReplyReceived ||
935 getAlwaysClearFlowsOnSwAdd()) {
936 // This is the first role-reply message we receive from
937 // this switch or roles were disabled when the switch
938 // connected:
939 // Delete all pre-existing flows for new connections to
940 // the master
941 //
942 // FIXME: Need to think more about what the test should
943 // be for when we flush the flow-table? For example,
944 // if all the controllers are temporarily in the backup
945 // role (e.g. right after a failure of the master
946 // controller) at the point the switch connects, then
947 // all of the controllers will initially connect as
948 // backup controllers and not flush the flow-table.
949 // Then when one of them is promoted to master following
950 // the master controller election the flow-table
951 // will still not be flushed because that's treated as
952 // a failover event where we don't want to flush the
953 // flow-table. The end result would be that the flow
954 // table for a newly connected switch is never
955 // flushed. Not sure how to handle that case though...
956 sw.clearAllFlowMods();
957 log.debug("First role reply from master switch {}, " +
958 "clear FlowTable to active switch list",
959 HexString.toHexString(sw.getId()));
960 }
961
962 // Some switches don't seem to update us with port
963 // status messages while in slave role.
964 readSwitchPortStateFromStorage(sw);
965
966 // Only add the switch to the active switch list if
967 // we're not in the slave role. Note that if the role
968 // attribute is null, then that means that the switch
969 // doesn't support the role request messages, so in that
970 // case we're effectively in the EQUAL role and the
971 // switch should be included in the active switch list.
972 addSwitch(sw);
973 log.debug("Added master switch {} to active switch list",
974 HexString.toHexString(sw.getId()));
975
976 }
977 else if (isActive && !sw.isActive()) {
978 // Transition from MASTER to SLAVE: remove switch
979 // from active switch list.
980 log.debug("Removed slave switch {} from active switch" +
981 " list", HexString.toHexString(sw.getId()));
982 removeSwitch(sw);
983 }
984
985 // Indicate that we have received a role reply message.
986 state.firstRoleReplyReceived = true;
987 }
988
989 protected boolean handleVendorMessage(OFVendor vendorMessage) {
990 boolean shouldHandleMessage = false;
991 int vendor = vendorMessage.getVendor();
992 switch (vendor) {
993 case OFNiciraVendorData.NX_VENDOR_ID:
994 OFNiciraVendorData niciraVendorData =
995 (OFNiciraVendorData)vendorMessage.getVendorData();
996 int dataType = niciraVendorData.getDataType();
997 switch (dataType) {
998 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
999 OFRoleReplyVendorData roleReplyVendorData =
1000 (OFRoleReplyVendorData) niciraVendorData;
1001 handleRoleReplyMessage(vendorMessage,
1002 roleReplyVendorData);
1003 break;
1004 default:
1005 log.warn("Unhandled Nicira VENDOR message; " +
1006 "data type = {}", dataType);
1007 break;
1008 }
1009 break;
1010 default:
1011 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1012 break;
1013 }
1014
1015 return shouldHandleMessage;
1016 }
1017
1018 /**
1019 * Dispatch an Openflow message from a switch to the appropriate
1020 * handler.
1021 * @param m The message to process
1022 * @throws IOException
1023 * @throws SwitchStateException
1024 */
1025 @LogMessageDocs({
1026 @LogMessageDoc(level="WARN",
1027 message="Config Reply from {switch} has " +
1028 "miss length set to {length}",
1029 explanation="The controller requires that the switch " +
1030 "use a miss length of 0xffff for correct " +
1031 "function",
1032 recommendation="Use a different switch to ensure " +
1033 "correct function"),
1034 @LogMessageDoc(level="WARN",
1035 message="Received ERROR from sw {switch} that "
1036 +"indicates roles are not supported "
1037 +"but we have received a valid "
1038 +"role reply earlier",
1039 explanation="The switch sent a confusing message to the" +
1040 "controller")
1041 })
1042 protected void processOFMessage(OFMessage m)
1043 throws IOException, SwitchStateException {
1044 boolean shouldHandleMessage = false;
1045
1046 switch (m.getType()) {
1047 case HELLO:
1048 if (log.isTraceEnabled())
1049 log.trace("HELLO from {}", sw);
1050
1051 if (state.hsState.equals(HandshakeState.START)) {
1052 state.hsState = HandshakeState.HELLO;
1053 sendHelloConfiguration();
1054 } else {
1055 throw new SwitchStateException("Unexpected HELLO from "
1056 + sw);
1057 }
1058 break;
1059 case ECHO_REQUEST:
1060 OFEchoReply reply =
1061 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1062 reply.setXid(m.getXid());
1063 sw.write(reply, null);
1064 break;
1065 case ECHO_REPLY:
1066 break;
1067 case FEATURES_REPLY:
1068 if (log.isTraceEnabled())
1069 log.trace("Features Reply from {}", sw);
1070
1071 sw.setFeaturesReply((OFFeaturesReply) m);
1072 if (state.hsState.equals(HandshakeState.HELLO)) {
1073 sendFeatureReplyConfiguration();
1074 state.hsState = HandshakeState.FEATURES_REPLY;
1075 // uncomment to enable "dumb" switches like cbench
1076 // state.hsState = HandshakeState.READY;
1077 // addSwitch(sw);
1078 } else {
1079 // return results to rest api caller
1080 sw.deliverOFFeaturesReply(m);
1081 // update database */
1082 updateActiveSwitchInfo(sw);
1083 }
1084 break;
1085 case GET_CONFIG_REPLY:
1086 if (log.isTraceEnabled())
1087 log.trace("Get config reply from {}", sw);
1088
1089 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1090 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1091 throw new SwitchStateException(em);
1092 }
1093 OFGetConfigReply cr = (OFGetConfigReply) m;
1094 if (cr.getMissSendLength() == (short)0xffff) {
1095 log.trace("Config Reply from {} confirms " +
1096 "miss length set to 0xffff", sw);
1097 } else {
1098 log.warn("Config Reply from {} has " +
1099 "miss length set to {}",
1100 sw, cr.getMissSendLength() & 0xffff);
1101 }
1102 state.hasGetConfigReply = true;
1103 checkSwitchReady();
1104 break;
1105 case VENDOR:
1106 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1107 break;
1108 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001109 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001110 // TODO: we need better error handling. Especially for
1111 // request/reply style message (stats, roles) we should have
1112 // a unified way to lookup the xid in the error message.
1113 // This will probable involve rewriting the way we handle
1114 // request/reply style messages.
1115 OFError error = (OFError) m;
1116 boolean shouldLogError = true;
1117 // TODO: should we check that firstRoleReplyReceived is false,
1118 // i.e., check only whether the first request fails?
1119 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1120 boolean isBadVendorError =
1121 (error.getErrorType() == OFError.OFErrorType.
1122 OFPET_BAD_REQUEST.getValue());
1123 // We expect to receive a bad vendor error when
1124 // we're connected to a switch that doesn't support
1125 // the Nicira vendor extensions (i.e. not OVS or
1126 // derived from OVS). By protocol, it should also be
1127 // BAD_VENDOR, but too many switch implementations
1128 // get it wrong and we can already check the xid()
1129 // so we can ignore the type with confidence that this
1130 // is not a spurious error
1131 shouldLogError = !isBadVendorError;
1132 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001133 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001134 if (state.firstRoleReplyReceived && (role != null)) {
1135 log.warn("Received ERROR from sw {} that "
1136 +"indicates roles are not supported "
1137 +"but we have received a valid "
1138 +"role reply earlier", sw);
1139 }
1140 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001141 Role requestedRole =
HIGUCHI Yutaeae374d2013-06-17 10:39:42 -07001142 sw.deliverRoleRequestNotSupportedEx(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001143 synchronized(roleChanger) {
1144 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001145 //This will now never happen. The Controller's role
1146 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001147 // the switch doesn't understand role request
1148 // messages and the current controller role is
1149 // slave. We need to disconnect the switch.
1150 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001151 log.warn("Closing {} channel because controller's role " +
1152 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001153 sw.getChannel().close();
1154 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001155 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001156 log.debug("Adding switch {} because we got an error" +
1157 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001158 // Controller's role is master: add to
1159 // active
1160 // TODO: check if clearing flow table is
1161 // right choice here.
1162 // Need to clear FlowMods before we add the switch
1163 // and dispatch updates otherwise we have a race condition.
1164 // TODO: switch update is async. Won't we still have a potential
1165 // race condition?
1166 sw.clearAllFlowMods();
1167 addSwitch(sw);
1168 }
1169 }
1170 }
1171 else {
1172 // TODO: Is this the right thing to do if we receive
1173 // some other error besides a bad vendor error?
1174 // Presumably that means the switch did actually
1175 // understand the role request message, but there
1176 // was some other error from processing the message.
1177 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1178 // error code, but it doesn't look like the Nicira
1179 // role request has that. Should check OVS source
1180 // code to see if it's possible for any other errors
1181 // to be returned.
1182 // If we received an error the switch is not
1183 // in the correct role, so we need to disconnect it.
1184 // We could also resend the request but then we need to
1185 // check if there are other pending request in which
1186 // case we shouldn't resend. If we do resend we need
1187 // to make sure that the switch eventually accepts one
1188 // of our requests or disconnect the switch. This feels
1189 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001190 log.debug("Closing {} channel because we recieved an " +
1191 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001192 sw.getChannel().close();
1193 }
1194 }
1195 // Once we support OF 1.2, we'd add code to handle it here.
1196 //if (error.getXid() == state.ofRoleRequestXid) {
1197 //}
1198 if (shouldLogError)
1199 logError(sw, error);
1200 break;
1201 case STATS_REPLY:
1202 if (state.hsState.ordinal() <
1203 HandshakeState.FEATURES_REPLY.ordinal()) {
1204 String em = "Unexpected STATS_REPLY from " + sw;
1205 throw new SwitchStateException(em);
1206 }
1207 sw.deliverStatisticsReply(m);
1208 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1209 processSwitchDescReply();
1210 }
1211 break;
1212 case PORT_STATUS:
1213 // We want to update our port state info even if we're in
1214 // the slave role, but we only want to update storage if
1215 // we're the master (or equal).
1216 boolean updateStorage = state.hsState.
1217 equals(HandshakeState.READY) &&
1218 (sw.getRole() != Role.SLAVE);
1219 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1220 shouldHandleMessage = true;
1221 break;
1222
1223 default:
1224 shouldHandleMessage = true;
1225 break;
1226 }
1227
1228 if (shouldHandleMessage) {
1229 sw.getListenerReadLock().lock();
1230 try {
1231 if (sw.isConnected()) {
1232 if (!state.hsState.equals(HandshakeState.READY)) {
1233 log.debug("Ignoring message type {} received " +
1234 "from switch {} before switch is " +
1235 "fully configured.", m.getType(), sw);
1236 }
1237 // Check if the controller is in the slave role for the
1238 // switch. If it is, then don't dispatch the message to
1239 // the listeners.
1240 // TODO: Should we dispatch messages that we expect to
1241 // receive when we're in the slave role, e.g. port
1242 // status messages? Since we're "hiding" switches from
1243 // the listeners when we're in the slave role, then it
1244 // seems a little weird to dispatch port status messages
1245 // to them. On the other hand there might be special
1246 // modules that care about all of the connected switches
1247 // and would like to receive port status notifications.
1248 else if (sw.getRole() == Role.SLAVE) {
1249 // Don't log message if it's a port status message
1250 // since we expect to receive those from the switch
1251 // and don't want to emit spurious messages.
1252 if (m.getType() != OFType.PORT_STATUS) {
1253 log.debug("Ignoring message type {} received " +
1254 "from switch {} while in the slave role.",
1255 m.getType(), sw);
1256 }
1257 } else {
1258 handleMessage(sw, m, null);
1259 }
1260 }
1261 }
1262 finally {
1263 sw.getListenerReadLock().unlock();
1264 }
1265 }
1266 }
1267 }
1268
1269 // ****************
1270 // Message handlers
1271 // ****************
1272
1273 protected void handlePortStatusMessage(IOFSwitch sw,
1274 OFPortStatus m,
1275 boolean updateStorage) {
1276 short portNumber = m.getDesc().getPortNumber();
1277 OFPhysicalPort port = m.getDesc();
1278 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001279 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1280 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001281 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001282 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001283 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1284 try {
1285 this.updates.put(update);
1286 } catch (InterruptedException e) {
1287 log.error("Failure adding update to queue", e);
1288 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001289 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001290 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1291 try {
1292 this.updates.put(update);
1293 } catch (InterruptedException e) {
1294 log.error("Failure adding update to queue", e);
1295 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001296 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001297 if (updateStorage)
1298 updatePortInfo(sw, port);
1299 log.debug("Port #{} modified for {}", portNumber, sw);
1300 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1301 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001302 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1303 try {
1304 this.updates.put(update);
1305 } catch (InterruptedException e) {
1306 log.error("Failure adding update to queue", e);
1307 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001308 if (updateStorage)
1309 updatePortInfo(sw, port);
1310 log.debug("Port #{} added for {}", portNumber, sw);
1311 } else if (m.getReason() ==
1312 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1313 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001314 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1315 try {
1316 this.updates.put(update);
1317 } catch (InterruptedException e) {
1318 log.error("Failure adding update to queue", e);
1319 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001320 if (updateStorage)
1321 removePortInfo(sw, portNumber);
1322 log.debug("Port #{} deleted for {}", portNumber, sw);
1323 }
1324 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1325 try {
1326 this.updates.put(update);
1327 } catch (InterruptedException e) {
1328 log.error("Failure adding update to queue", e);
1329 }
1330 }
1331
1332 /**
1333 * flcontext_cache - Keep a thread local stack of contexts
1334 */
1335 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1336 new ThreadLocal <Stack<FloodlightContext>> () {
1337 @Override
1338 protected Stack<FloodlightContext> initialValue() {
1339 return new Stack<FloodlightContext>();
1340 }
1341 };
1342
1343 /**
1344 * flcontext_alloc - pop a context off the stack, if required create a new one
1345 * @return FloodlightContext
1346 */
1347 protected static FloodlightContext flcontext_alloc() {
1348 FloodlightContext flcontext = null;
1349
1350 if (flcontext_cache.get().empty()) {
1351 flcontext = new FloodlightContext();
1352 }
1353 else {
1354 flcontext = flcontext_cache.get().pop();
1355 }
1356
1357 return flcontext;
1358 }
1359
1360 /**
1361 * flcontext_free - Free the context to the current thread
1362 * @param flcontext
1363 */
1364 protected void flcontext_free(FloodlightContext flcontext) {
1365 flcontext.getStorage().clear();
1366 flcontext_cache.get().push(flcontext);
1367 }
1368
1369 /**
1370 * Handle replies to certain OFMessages, and pass others off to listeners
1371 * @param sw The switch for the message
1372 * @param m The message
1373 * @param bContext The floodlight context. If null then floodlight context would
1374 * be allocated in this function
1375 * @throws IOException
1376 */
1377 @LogMessageDocs({
1378 @LogMessageDoc(level="ERROR",
1379 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1380 " field is empty.",
1381 explanation="The switch sent an improperly-formatted PacketIn" +
1382 " message",
1383 recommendation=LogMessageDoc.CHECK_SWITCH),
1384 @LogMessageDoc(level="WARN",
1385 message="Unhandled OF Message: {} from {}",
1386 explanation="The switch sent a message not handled by " +
1387 "the controller")
1388 })
1389 protected void handleMessage(IOFSwitch sw, OFMessage m,
1390 FloodlightContext bContext)
1391 throws IOException {
1392 Ethernet eth = null;
1393
1394 switch (m.getType()) {
1395 case PACKET_IN:
1396 OFPacketIn pi = (OFPacketIn)m;
1397
1398 if (pi.getPacketData().length <= 0) {
1399 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1400 ") because the data field is empty.");
1401 return;
1402 }
1403
1404 if (Controller.ALWAYS_DECODE_ETH) {
1405 eth = new Ethernet();
1406 eth.deserialize(pi.getPacketData(), 0,
1407 pi.getPacketData().length);
1408 counterStore.updatePacketInCounters(sw, m, eth);
1409 }
1410 // fall through to default case...
1411
1412 default:
1413
1414 List<IOFMessageListener> listeners = null;
1415 if (messageListeners.containsKey(m.getType())) {
1416 listeners = messageListeners.get(m.getType()).
1417 getOrderedListeners();
1418 }
1419
1420 FloodlightContext bc = null;
1421 if (listeners != null) {
1422 // Check if floodlight context is passed from the calling
1423 // function, if so use that floodlight context, otherwise
1424 // allocate one
1425 if (bContext == null) {
1426 bc = flcontext_alloc();
1427 } else {
1428 bc = bContext;
1429 }
1430 if (eth != null) {
1431 IFloodlightProviderService.bcStore.put(bc,
1432 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1433 eth);
1434 }
1435
1436 // Get the starting time (overall and per-component) of
1437 // the processing chain for this packet if performance
1438 // monitoring is turned on
1439 pktinProcTime.bootstrap(listeners);
1440 pktinProcTime.recordStartTimePktIn();
1441 Command cmd;
1442 for (IOFMessageListener listener : listeners) {
1443 if (listener instanceof IOFSwitchFilter) {
1444 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1445 continue;
1446 }
1447 }
1448
1449 pktinProcTime.recordStartTimeComp(listener);
1450 cmd = listener.receive(sw, m, bc);
1451 pktinProcTime.recordEndTimeComp(listener);
1452
1453 if (Command.STOP.equals(cmd)) {
1454 break;
1455 }
1456 }
1457 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1458 } else {
1459 log.warn("Unhandled OF Message: {} from {}", m, sw);
1460 }
1461
1462 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1463 }
1464 }
1465
1466 /**
1467 * Log an OpenFlow error message from a switch
1468 * @param sw The switch that sent the error
1469 * @param error The error message
1470 */
1471 @LogMessageDoc(level="ERROR",
1472 message="Error {error type} {error code} from {switch}",
1473 explanation="The switch responded with an unexpected error" +
1474 "to an OpenFlow message from the controller",
1475 recommendation="This could indicate improper network operation. " +
1476 "If the problem persists restarting the switch and " +
1477 "controller may help."
1478 )
1479 protected void logError(IOFSwitch sw, OFError error) {
1480 int etint = 0xffff & error.getErrorType();
1481 if (etint < 0 || etint >= OFErrorType.values().length) {
1482 log.error("Unknown error code {} from sw {}", etint, sw);
1483 }
1484 OFErrorType et = OFErrorType.values()[etint];
1485 switch (et) {
1486 case OFPET_HELLO_FAILED:
1487 OFHelloFailedCode hfc =
1488 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1489 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1490 break;
1491 case OFPET_BAD_REQUEST:
1492 OFBadRequestCode brc =
1493 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1494 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1495 break;
1496 case OFPET_BAD_ACTION:
1497 OFBadActionCode bac =
1498 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1499 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1500 break;
1501 case OFPET_FLOW_MOD_FAILED:
1502 OFFlowModFailedCode fmfc =
1503 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1504 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1505 break;
1506 case OFPET_PORT_MOD_FAILED:
1507 OFPortModFailedCode pmfc =
1508 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1509 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1510 break;
1511 case OFPET_QUEUE_OP_FAILED:
1512 OFQueueOpFailedCode qofc =
1513 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1514 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1515 break;
1516 default:
1517 break;
1518 }
1519 }
1520
1521 /**
1522 * Add a switch to the active switch list and call the switch listeners.
1523 * This happens either when a switch first connects (and the controller is
1524 * not in the slave role) or when the role of the controller changes from
1525 * slave to master.
1526 * @param sw the switch that has been added
1527 */
1528 // TODO: need to rethink locking and the synchronous switch update.
1529 // We can / should also handle duplicate DPIDs in connectedSwitches
1530 @LogMessageDoc(level="ERROR",
1531 message="New switch added {switch} for already-added switch {switch}",
1532 explanation="A switch with the same DPID as another switch " +
1533 "connected to the controller. This can be caused by " +
1534 "multiple switches configured with the same DPID, or " +
1535 "by a switch reconnected very quickly after " +
1536 "disconnecting.",
1537 recommendation="If this happens repeatedly, it is likely there " +
1538 "are switches with duplicate DPIDs on the network. " +
1539 "Reconfigure the appropriate switches. If it happens " +
1540 "very rarely, then it is likely this is a transient " +
1541 "network problem that can be ignored."
1542 )
1543 protected void addSwitch(IOFSwitch sw) {
1544 // TODO: is it safe to modify the HashMap without holding
1545 // the old switch's lock?
1546 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1547 if (sw == oldSw) {
1548 // Note == for object equality, not .equals for value
1549 log.info("New add switch for pre-existing switch {}", sw);
1550 return;
1551 }
1552
1553 if (oldSw != null) {
1554 oldSw.getListenerWriteLock().lock();
1555 try {
1556 log.error("New switch added {} for already-added switch {}",
1557 sw, oldSw);
1558 // Set the connected flag to false to suppress calling
1559 // the listeners for this switch in processOFMessage
1560 oldSw.setConnected(false);
1561
1562 oldSw.cancelAllStatisticsReplies();
1563
1564 updateInactiveSwitchInfo(oldSw);
1565
1566 // we need to clean out old switch state definitively
1567 // before adding the new switch
1568 // FIXME: It seems not completely kosher to call the
1569 // switch listeners here. I thought one of the points of
1570 // having the asynchronous switch update mechanism was so
1571 // the addedSwitch and removedSwitch were always called
1572 // from a single thread to simplify concurrency issues
1573 // for the listener.
1574 if (switchListeners != null) {
1575 for (IOFSwitchListener listener : switchListeners) {
1576 listener.removedSwitch(oldSw);
1577 }
1578 }
1579 // will eventually trigger a removeSwitch(), which will cause
1580 // a "Not removing Switch ... already removed debug message.
1581 // TODO: Figure out a way to handle this that avoids the
1582 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001583 log.debug("Closing {} because a new IOFSwitch got added " +
1584 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001585 oldSw.getChannel().close();
1586 }
1587 finally {
1588 oldSw.getListenerWriteLock().unlock();
1589 }
1590 }
1591
1592 updateActiveSwitchInfo(sw);
1593 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1594 try {
1595 this.updates.put(update);
1596 } catch (InterruptedException e) {
1597 log.error("Failure adding update to queue", e);
1598 }
1599 }
1600
1601 /**
1602 * Remove a switch from the active switch list and call the switch listeners.
1603 * This happens either when the switch is disconnected or when the
1604 * controller's role for the switch changes from master to slave.
1605 * @param sw the switch that has been removed
1606 */
1607 protected void removeSwitch(IOFSwitch sw) {
1608 // No need to acquire the listener lock, since
1609 // this method is only called after netty has processed all
1610 // pending messages
1611 log.debug("removeSwitch: {}", sw);
1612 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1613 log.debug("Not removing switch {}; already removed", sw);
1614 return;
1615 }
1616 // We cancel all outstanding statistics replies if the switch transition
1617 // from active. In the future we might allow statistics requests
1618 // from slave controllers. Then we need to move this cancelation
1619 // to switch disconnect
1620 sw.cancelAllStatisticsReplies();
1621
1622 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1623 // here if role support is enabled. In that case if the switch is being
1624 // removed because we've been switched to being in the slave role, then I think
1625 // it's possible that the new master may have already been promoted to master
1626 // and written out the active switch state to storage. If we now execute
1627 // updateInactiveSwitchInfo we may wipe out all of the state that was
1628 // written out by the new master. Maybe need to revisit how we handle all
1629 // of the switch state that's written to storage.
1630
1631 updateInactiveSwitchInfo(sw);
1632 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1633 try {
1634 this.updates.put(update);
1635 } catch (InterruptedException e) {
1636 log.error("Failure adding update to queue", e);
1637 }
1638 }
1639
1640 // ***************
1641 // IFloodlightProvider
1642 // ***************
1643
1644 @Override
1645 public synchronized void addOFMessageListener(OFType type,
1646 IOFMessageListener listener) {
1647 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1648 messageListeners.get(type);
1649 if (ldd == null) {
1650 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1651 messageListeners.put(type, ldd);
1652 }
1653 ldd.addListener(type, listener);
1654 }
1655
1656 @Override
1657 public synchronized void removeOFMessageListener(OFType type,
1658 IOFMessageListener listener) {
1659 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1660 messageListeners.get(type);
1661 if (ldd != null) {
1662 ldd.removeListener(listener);
1663 }
1664 }
1665
1666 private void logListeners() {
1667 for (Map.Entry<OFType,
1668 ListenerDispatcher<OFType,
1669 IOFMessageListener>> entry
1670 : messageListeners.entrySet()) {
1671
1672 OFType type = entry.getKey();
1673 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1674 entry.getValue();
1675
1676 StringBuffer sb = new StringBuffer();
1677 sb.append("OFListeners for ");
1678 sb.append(type);
1679 sb.append(": ");
1680 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1681 sb.append(l.getName());
1682 sb.append(",");
1683 }
1684 log.debug(sb.toString());
1685 }
1686 }
1687
1688 public void removeOFMessageListeners(OFType type) {
1689 messageListeners.remove(type);
1690 }
1691
1692 @Override
1693 public Map<Long, IOFSwitch> getSwitches() {
1694 return Collections.unmodifiableMap(this.activeSwitches);
1695 }
1696
1697 @Override
1698 public void addOFSwitchListener(IOFSwitchListener listener) {
1699 this.switchListeners.add(listener);
1700 }
1701
1702 @Override
1703 public void removeOFSwitchListener(IOFSwitchListener listener) {
1704 this.switchListeners.remove(listener);
1705 }
1706
1707 @Override
1708 public Map<OFType, List<IOFMessageListener>> getListeners() {
1709 Map<OFType, List<IOFMessageListener>> lers =
1710 new HashMap<OFType, List<IOFMessageListener>>();
1711 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1712 messageListeners.entrySet()) {
1713 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1714 }
1715 return Collections.unmodifiableMap(lers);
1716 }
1717
1718 @Override
1719 @LogMessageDocs({
1720 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1721 "a null switch",
1722 explanation="Failed to process a message because the switch " +
1723 " is no longer connected."),
1724 @LogMessageDoc(level="ERROR",
1725 message="Error reinjecting OFMessage on switch {switch}",
1726 explanation="An I/O error occured while attempting to " +
1727 "process an OpenFlow message",
1728 recommendation=LogMessageDoc.CHECK_SWITCH)
1729 })
1730 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1731 FloodlightContext bc) {
1732 if (sw == null) {
1733 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1734 return false;
1735 }
1736
1737 // FIXME: Do we need to be able to inject messages to switches
1738 // where we're the slave controller (i.e. they're connected but
1739 // not active)?
1740 // FIXME: Don't we need synchronization logic here so we're holding
1741 // the listener read lock when we call handleMessage? After some
1742 // discussions it sounds like the right thing to do here would be to
1743 // inject the message as a netty upstream channel event so it goes
1744 // through the normal netty event processing, including being
1745 // handled
1746 if (!activeSwitches.containsKey(sw.getId())) return false;
1747
1748 try {
1749 // Pass Floodlight context to the handleMessages()
1750 handleMessage(sw, msg, bc);
1751 } catch (IOException e) {
1752 log.error("Error reinjecting OFMessage on switch {}",
1753 HexString.toHexString(sw.getId()));
1754 return false;
1755 }
1756 return true;
1757 }
1758
1759 @Override
1760 @LogMessageDoc(message="Calling System.exit",
1761 explanation="The controller is terminating")
1762 public synchronized void terminate() {
1763 log.info("Calling System.exit");
1764 System.exit(1);
1765 }
1766
1767 @Override
1768 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1769 // call the overloaded version with floodlight context set to null
1770 return injectOfMessage(sw, msg, null);
1771 }
1772
1773 @Override
1774 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1775 FloodlightContext bc) {
1776 if (log.isTraceEnabled()) {
1777 String str = OFMessage.getDataAsString(sw, m, bc);
1778 log.trace("{}", str);
1779 }
1780
1781 List<IOFMessageListener> listeners = null;
1782 if (messageListeners.containsKey(m.getType())) {
1783 listeners =
1784 messageListeners.get(m.getType()).getOrderedListeners();
1785 }
1786
1787 if (listeners != null) {
1788 for (IOFMessageListener listener : listeners) {
1789 if (listener instanceof IOFSwitchFilter) {
1790 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1791 continue;
1792 }
1793 }
1794 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1795 break;
1796 }
1797 }
1798 }
1799 }
1800
1801 @Override
1802 public BasicFactory getOFMessageFactory() {
1803 return factory;
1804 }
1805
1806 @Override
1807 public String getControllerId() {
1808 return controllerId;
1809 }
1810
1811 // **************
1812 // Initialization
1813 // **************
1814
1815 protected void updateAllInactiveSwitchInfo() {
1816 if (role == Role.SLAVE) {
1817 return;
1818 }
1819 String controllerId = getControllerId();
1820 String[] switchColumns = { SWITCH_DATAPATH_ID,
1821 SWITCH_CONTROLLER_ID,
1822 SWITCH_ACTIVE };
1823 String[] portColumns = { PORT_ID, PORT_SWITCH };
1824 IResultSet switchResultSet = null;
1825 try {
1826 OperatorPredicate op =
1827 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1828 OperatorPredicate.Operator.EQ,
1829 controllerId);
1830 switchResultSet =
1831 storageSource.executeQuery(SWITCH_TABLE_NAME,
1832 switchColumns,
1833 op, null);
1834 while (switchResultSet.next()) {
1835 IResultSet portResultSet = null;
1836 try {
1837 String datapathId =
1838 switchResultSet.getString(SWITCH_DATAPATH_ID);
1839 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1840 op = new OperatorPredicate(PORT_SWITCH,
1841 OperatorPredicate.Operator.EQ,
1842 datapathId);
1843 portResultSet =
1844 storageSource.executeQuery(PORT_TABLE_NAME,
1845 portColumns,
1846 op, null);
1847 while (portResultSet.next()) {
1848 portResultSet.deleteRow();
1849 }
1850 portResultSet.save();
1851 }
1852 finally {
1853 if (portResultSet != null)
1854 portResultSet.close();
1855 }
1856 }
1857 switchResultSet.save();
1858 }
1859 finally {
1860 if (switchResultSet != null)
1861 switchResultSet.close();
1862 }
1863 }
1864
1865 protected void updateControllerInfo() {
1866 updateAllInactiveSwitchInfo();
1867
1868 // Write out the controller info to the storage source
1869 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1870 String id = getControllerId();
1871 controllerInfo.put(CONTROLLER_ID, id);
1872 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1873 }
1874
1875 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1876 if (role == Role.SLAVE) {
1877 return;
1878 }
1879 // Obtain the row info for the switch
1880 Map<String, Object> switchInfo = new HashMap<String, Object>();
1881 String datapathIdString = sw.getStringId();
1882 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1883 String controllerId = getControllerId();
1884 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1885 Date connectedSince = sw.getConnectedSince();
1886 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1887 Channel channel = sw.getChannel();
1888 SocketAddress socketAddress = channel.getRemoteAddress();
1889 if (socketAddress != null) {
1890 String socketAddressString = socketAddress.toString();
1891 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1892 if (socketAddress instanceof InetSocketAddress) {
1893 InetSocketAddress inetSocketAddress =
1894 (InetSocketAddress)socketAddress;
1895 InetAddress inetAddress = inetSocketAddress.getAddress();
1896 String ip = inetAddress.getHostAddress();
1897 switchInfo.put(SWITCH_IP, ip);
1898 }
1899 }
1900
1901 // Write out the switch features info
1902 long capabilities = U32.f(sw.getCapabilities());
1903 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1904 long buffers = U32.f(sw.getBuffers());
1905 switchInfo.put(SWITCH_BUFFERS, buffers);
1906 long tables = U32.f(sw.getTables());
1907 switchInfo.put(SWITCH_TABLES, tables);
1908 long actions = U32.f(sw.getActions());
1909 switchInfo.put(SWITCH_ACTIONS, actions);
1910 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1911
1912 // Update the switch
1913 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1914
1915 // Update the ports
1916 for (OFPhysicalPort port: sw.getPorts()) {
1917 updatePortInfo(sw, port);
1918 }
1919 }
1920
1921 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1922 if (role == Role.SLAVE) {
1923 return;
1924 }
1925 log.debug("Update DB with inactiveSW {}", sw);
1926 // Update the controller info in the storage source to be inactive
1927 Map<String, Object> switchInfo = new HashMap<String, Object>();
1928 String datapathIdString = sw.getStringId();
1929 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1930 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1931 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1932 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1933 }
1934
1935 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1936 if (role == Role.SLAVE) {
1937 return;
1938 }
1939 String datapathIdString = sw.getStringId();
1940 Map<String, Object> portInfo = new HashMap<String, Object>();
1941 int portNumber = U16.f(port.getPortNumber());
1942 String id = datapathIdString + "|" + portNumber;
1943 portInfo.put(PORT_ID, id);
1944 portInfo.put(PORT_SWITCH, datapathIdString);
1945 portInfo.put(PORT_NUMBER, portNumber);
1946 byte[] hardwareAddress = port.getHardwareAddress();
1947 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1948 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1949 String name = port.getName();
1950 portInfo.put(PORT_NAME, name);
1951 long config = U32.f(port.getConfig());
1952 portInfo.put(PORT_CONFIG, config);
1953 long state = U32.f(port.getState());
1954 portInfo.put(PORT_STATE, state);
1955 long currentFeatures = U32.f(port.getCurrentFeatures());
1956 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1957 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1958 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1959 long supportedFeatures = U32.f(port.getSupportedFeatures());
1960 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1961 long peerFeatures = U32.f(port.getPeerFeatures());
1962 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1963 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1964 }
1965
1966 /**
1967 * Read switch port data from storage and write it into a switch object
1968 * @param sw the switch to update
1969 */
1970 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1971 OperatorPredicate op =
1972 new OperatorPredicate(PORT_SWITCH,
1973 OperatorPredicate.Operator.EQ,
1974 sw.getStringId());
1975 IResultSet portResultSet =
1976 storageSource.executeQuery(PORT_TABLE_NAME,
1977 null, op, null);
1978 //Map<Short, OFPhysicalPort> oldports =
1979 // new HashMap<Short, OFPhysicalPort>();
1980 //oldports.putAll(sw.getPorts());
1981
1982 while (portResultSet.next()) {
1983 try {
1984 OFPhysicalPort p = new OFPhysicalPort();
1985 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1986 p.setName(portResultSet.getString(PORT_NAME));
1987 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1988 p.setState((int)portResultSet.getLong(PORT_STATE));
1989 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1990 p.setHardwareAddress(HexString.fromHexString(portMac));
1991 p.setCurrentFeatures((int)portResultSet.
1992 getLong(PORT_CURRENT_FEATURES));
1993 p.setAdvertisedFeatures((int)portResultSet.
1994 getLong(PORT_ADVERTISED_FEATURES));
1995 p.setSupportedFeatures((int)portResultSet.
1996 getLong(PORT_SUPPORTED_FEATURES));
1997 p.setPeerFeatures((int)portResultSet.
1998 getLong(PORT_PEER_FEATURES));
1999 //oldports.remove(Short.valueOf(p.getPortNumber()));
2000 sw.setPort(p);
2001 } catch (NullPointerException e) {
2002 // ignore
2003 }
2004 }
2005 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2006 try {
2007 this.updates.put(update);
2008 } catch (InterruptedException e) {
2009 log.error("Failure adding update to queue", e);
2010 }
2011 }
2012
2013 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2014 if (role == Role.SLAVE) {
2015 return;
2016 }
2017 String datapathIdString = sw.getStringId();
2018 String id = datapathIdString + "|" + portNumber;
2019 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2020 }
2021
2022 /**
2023 * Sets the initial role based on properties in the config params.
2024 * It looks for two different properties.
2025 * If the "role" property is specified then the value should be
2026 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2027 * controller is set to the specified value. If the "role" property
2028 * is not specified then it looks next for the "role.path" property.
2029 * In this case the value should be the path to a property file in
2030 * the file system that contains a property called "floodlight.role"
2031 * which can be one of the values listed above for the "role" property.
2032 * The idea behind the "role.path" mechanism is that you have some
2033 * separate heartbeat and master controller election algorithm that
2034 * determines the role of the controller. When a role transition happens,
2035 * it updates the current role in the file specified by the "role.path"
2036 * file. Then if floodlight restarts for some reason it can get the
2037 * correct current role of the controller from the file.
2038 * @param configParams The config params for the FloodlightProvider service
2039 * @return A valid role if role information is specified in the
2040 * config params, otherwise null
2041 */
2042 @LogMessageDocs({
2043 @LogMessageDoc(message="Controller role set to {role}",
2044 explanation="Setting the initial HA role to "),
2045 @LogMessageDoc(level="ERROR",
2046 message="Invalid current role value: {role}",
2047 explanation="An invalid HA role value was read from the " +
2048 "properties file",
2049 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2050 })
2051 protected Role getInitialRole(Map<String, String> configParams) {
2052 Role role = null;
2053 String roleString = configParams.get("role");
2054 if (roleString == null) {
2055 String rolePath = configParams.get("rolepath");
2056 if (rolePath != null) {
2057 Properties properties = new Properties();
2058 try {
2059 properties.load(new FileInputStream(rolePath));
2060 roleString = properties.getProperty("floodlight.role");
2061 }
2062 catch (IOException exc) {
2063 // Don't treat it as an error if the file specified by the
2064 // rolepath property doesn't exist. This lets us enable the
2065 // HA mechanism by just creating/setting the floodlight.role
2066 // property in that file without having to modify the
2067 // floodlight properties.
2068 }
2069 }
2070 }
2071
2072 if (roleString != null) {
2073 // Canonicalize the string to the form used for the enum constants
2074 roleString = roleString.trim().toUpperCase();
2075 try {
2076 role = Role.valueOf(roleString);
2077 }
2078 catch (IllegalArgumentException exc) {
2079 log.error("Invalid current role value: {}", roleString);
2080 }
2081 }
2082
2083 log.info("Controller role set to {}", role);
2084
2085 return role;
2086 }
2087
2088 /**
2089 * Tell controller that we're ready to accept switches loop
2090 * @throws IOException
2091 */
2092 @LogMessageDocs({
2093 @LogMessageDoc(message="Listening for switch connections on {address}",
2094 explanation="The controller is ready and listening for new" +
2095 " switch connections"),
2096 @LogMessageDoc(message="Storage exception in controller " +
2097 "updates loop; terminating process",
2098 explanation=ERROR_DATABASE,
2099 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2100 @LogMessageDoc(level="ERROR",
2101 message="Exception in controller updates loop",
2102 explanation="Failed to dispatch controller event",
2103 recommendation=LogMessageDoc.GENERIC_ACTION)
2104 })
2105 public void run() {
2106 if (log.isDebugEnabled()) {
2107 logListeners();
2108 }
2109
2110 try {
2111 final ServerBootstrap bootstrap = createServerBootStrap();
2112
2113 bootstrap.setOption("reuseAddr", true);
2114 bootstrap.setOption("child.keepAlive", true);
2115 bootstrap.setOption("child.tcpNoDelay", true);
2116 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2117
2118 ChannelPipelineFactory pfact =
2119 new OpenflowPipelineFactory(this, null);
2120 bootstrap.setPipelineFactory(pfact);
2121 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2122 final ChannelGroup cg = new DefaultChannelGroup();
2123 cg.add(bootstrap.bind(sa));
2124
2125 log.info("Listening for switch connections on {}", sa);
2126 } catch (Exception e) {
2127 throw new RuntimeException(e);
2128 }
2129
2130 // main loop
2131 while (true) {
2132 try {
2133 IUpdate update = updates.take();
2134 update.dispatch();
2135 } catch (InterruptedException e) {
2136 return;
2137 } catch (StorageException e) {
2138 log.error("Storage exception in controller " +
2139 "updates loop; terminating process", e);
2140 return;
2141 } catch (Exception e) {
2142 log.error("Exception in controller updates loop", e);
2143 }
2144 }
2145 }
2146
2147 private ServerBootstrap createServerBootStrap() {
2148 if (workerThreads == 0) {
2149 return new ServerBootstrap(
2150 new NioServerSocketChannelFactory(
2151 Executors.newCachedThreadPool(),
2152 Executors.newCachedThreadPool()));
2153 } else {
2154 return new ServerBootstrap(
2155 new NioServerSocketChannelFactory(
2156 Executors.newCachedThreadPool(),
2157 Executors.newCachedThreadPool(), workerThreads));
2158 }
2159 }
2160
2161 public void setConfigParams(Map<String, String> configParams) {
2162 String ofPort = configParams.get("openflowport");
2163 if (ofPort != null) {
2164 this.openFlowPort = Integer.parseInt(ofPort);
2165 }
2166 log.debug("OpenFlow port set to {}", this.openFlowPort);
2167 String threads = configParams.get("workerthreads");
2168 if (threads != null) {
2169 this.workerThreads = Integer.parseInt(threads);
2170 }
2171 log.debug("Number of worker threads set to {}", this.workerThreads);
2172 String controllerId = configParams.get("controllerid");
2173 if (controllerId != null) {
2174 this.controllerId = controllerId;
2175 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002176 else {
2177 //Try to get the hostname of the machine and use that for controller ID
2178 try {
2179 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2180 this.controllerId = hostname;
2181 } catch (UnknownHostException e) {
2182 // Can't get hostname, we'll just use the default
2183 }
2184 }
2185
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002186 log.debug("ControllerId set to {}", this.controllerId);
2187 }
2188
2189 private void initVendorMessages() {
2190 // Configure openflowj to be able to parse the role request/reply
2191 // vendor messages.
2192 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2193 OFNiciraVendorData.NX_VENDOR_ID, 4);
2194 OFVendorId.registerVendorId(niciraVendorId);
2195 OFBasicVendorDataType roleRequestVendorData =
2196 new OFBasicVendorDataType(
2197 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2198 OFRoleRequestVendorData.getInstantiable());
2199 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2200 OFBasicVendorDataType roleReplyVendorData =
2201 new OFBasicVendorDataType(
2202 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2203 OFRoleReplyVendorData.getInstantiable());
2204 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2205 }
2206
2207 /**
2208 * Initialize internal data structures
2209 */
2210 public void init(Map<String, String> configParams) {
2211 // These data structures are initialized here because other
2212 // module's startUp() might be called before ours
2213 this.messageListeners =
2214 new ConcurrentHashMap<OFType,
2215 ListenerDispatcher<OFType,
2216 IOFMessageListener>>();
2217 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2218 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2219 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2220 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2221 this.controllerNodeIPsCache = new HashMap<String, String>();
2222 this.updates = new LinkedBlockingQueue<IUpdate>();
2223 this.factory = new BasicFactory();
2224 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2225 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002226 //this.role = getInitialRole(configParams);
2227 //Set the controller's role to MASTER so it always tries to do role requests.
2228 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002229 this.roleChanger = new RoleChanger();
2230 initVendorMessages();
2231 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002232 }
2233
2234 /**
2235 * Startup all of the controller's components
2236 */
2237 @LogMessageDoc(message="Waiting for storage source",
2238 explanation="The system database is not yet ready",
2239 recommendation="If this message persists, this indicates " +
2240 "that the system database has failed to start. " +
2241 LogMessageDoc.CHECK_CONTROLLER)
2242 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002243 try {
2244 registryService.registerController(controllerId);
2245 } catch (RegistryException e2) {
2246 log.warn("Registry service error: {}", e2.getMessage());
2247 }
2248
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002249 // Create the table names we use
2250 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2251 storageSource.createTable(SWITCH_TABLE_NAME, null);
2252 storageSource.createTable(PORT_TABLE_NAME, null);
2253 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2254 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2255 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2256 CONTROLLER_ID);
2257 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2258 SWITCH_DATAPATH_ID);
2259 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2260 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2261 CONTROLLER_INTERFACE_ID);
2262 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2263
2264 while (true) {
2265 try {
2266 updateControllerInfo();
2267 break;
2268 }
2269 catch (StorageException e) {
2270 log.info("Waiting for storage source");
2271 try {
2272 Thread.sleep(1000);
2273 } catch (InterruptedException e1) {
2274 }
2275 }
2276 }
2277
2278 // Add our REST API
2279 restApi.addRestletRoutable(new CoreWebRoutable());
2280 }
2281
2282 @Override
2283 public void addInfoProvider(String type, IInfoProvider provider) {
2284 if (!providerMap.containsKey(type)) {
2285 providerMap.put(type, new ArrayList<IInfoProvider>());
2286 }
2287 providerMap.get(type).add(provider);
2288 }
2289
2290 @Override
2291 public void removeInfoProvider(String type, IInfoProvider provider) {
2292 if (!providerMap.containsKey(type)) {
2293 log.debug("Provider type {} doesn't exist.", type);
2294 return;
2295 }
2296
2297 providerMap.get(type).remove(provider);
2298 }
2299
2300 public Map<String, Object> getControllerInfo(String type) {
2301 if (!providerMap.containsKey(type)) return null;
2302
2303 Map<String, Object> result = new LinkedHashMap<String, Object>();
2304 for (IInfoProvider provider : providerMap.get(type)) {
2305 result.putAll(provider.getInfo(type));
2306 }
2307
2308 return result;
2309 }
2310
2311 @Override
2312 public void addHAListener(IHAListener listener) {
2313 this.haListeners.add(listener);
2314 }
2315
2316 @Override
2317 public void removeHAListener(IHAListener listener) {
2318 this.haListeners.remove(listener);
2319 }
2320
2321
2322 /**
2323 * Handle changes to the controller nodes IPs and dispatch update.
2324 */
2325 @SuppressWarnings("unchecked")
2326 protected void handleControllerNodeIPChanges() {
2327 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2328 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2329 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2330 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2331 CONTROLLER_INTERFACE_TYPE,
2332 CONTROLLER_INTERFACE_NUMBER,
2333 CONTROLLER_INTERFACE_DISCOVERED_IP };
2334 synchronized(controllerNodeIPsCache) {
2335 // We currently assume that interface Ethernet0 is the relevant
2336 // controller interface. Might change.
2337 // We could (should?) implement this using
2338 // predicates, but creating the individual and compound predicate
2339 // seems more overhead then just checking every row. Particularly,
2340 // since the number of rows is small and changes infrequent
2341 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2342 colNames,null, null);
2343 while (res.next()) {
2344 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2345 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2346 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2347 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2348 String curIP = controllerNodeIPsCache.get(controllerID);
2349
2350 curControllerNodeIPs.put(controllerID, discoveredIP);
2351 if (curIP == null) {
2352 // new controller node IP
2353 addedControllerNodeIPs.put(controllerID, discoveredIP);
2354 }
2355 else if (curIP != discoveredIP) {
2356 // IP changed
2357 removedControllerNodeIPs.put(controllerID, curIP);
2358 addedControllerNodeIPs.put(controllerID, discoveredIP);
2359 }
2360 }
2361 }
2362 // Now figure out if rows have been deleted. We can't use the
2363 // rowKeys from rowsDeleted directly, since the tables primary
2364 // key is a compound that we can't disassemble
2365 Set<String> curEntries = curControllerNodeIPs.keySet();
2366 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2367 removedEntries.removeAll(curEntries);
2368 for (String removedControllerID : removedEntries)
2369 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2370 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2371 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2372 curControllerNodeIPs, addedControllerNodeIPs,
2373 removedControllerNodeIPs);
2374 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2375 try {
2376 this.updates.put(update);
2377 } catch (InterruptedException e) {
2378 log.error("Failure adding update to queue", e);
2379 }
2380 }
2381 }
2382 }
2383
2384 @Override
2385 public Map<String, String> getControllerNodeIPs() {
2386 // We return a copy of the mapping so we can guarantee that
2387 // the mapping return is the same as one that will be (or was)
2388 // dispatched to IHAListeners
2389 HashMap<String,String> retval = new HashMap<String,String>();
2390 synchronized(controllerNodeIPsCache) {
2391 retval.putAll(controllerNodeIPsCache);
2392 }
2393 return retval;
2394 }
2395
2396 @Override
2397 public void rowsModified(String tableName, Set<Object> rowKeys) {
2398 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2399 handleControllerNodeIPChanges();
2400 }
2401
2402 }
2403
2404 @Override
2405 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2406 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2407 handleControllerNodeIPChanges();
2408 }
2409 }
2410
2411 @Override
2412 public long getSystemStartTime() {
2413 return (this.systemStartTime);
2414 }
2415
2416 @Override
2417 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2418 this.alwaysClearFlowsOnSwAdd = value;
2419 }
2420
2421 public boolean getAlwaysClearFlowsOnSwAdd() {
2422 return this.alwaysClearFlowsOnSwAdd;
2423 }
2424}