blob: d9fa6668de7dd60a6dd65e09ec5bcb106369fd48 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
61import net.floodlightcontroller.core.annotations.LogMessageDoc;
62import net.floodlightcontroller.core.annotations.LogMessageDocs;
63import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
64import net.floodlightcontroller.core.util.ListenerDispatcher;
65import net.floodlightcontroller.core.web.CoreWebRoutable;
66import net.floodlightcontroller.counter.ICounterStoreService;
67import net.floodlightcontroller.packet.Ethernet;
68import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
69import net.floodlightcontroller.restserver.IRestApiService;
70import net.floodlightcontroller.storage.IResultSet;
71import net.floodlightcontroller.storage.IStorageSourceListener;
72import net.floodlightcontroller.storage.IStorageSourceService;
73import net.floodlightcontroller.storage.OperatorPredicate;
74import net.floodlightcontroller.storage.StorageException;
75import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta20514902013-06-12 11:24:16 -070076import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoRouteService;
HIGUCHI Yutaedf81d72013-06-12 12:06:57 -070077import net.onrc.onos.ofcontroller.flowcache.IFlowService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080078import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080079import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080080import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080081
82import org.jboss.netty.bootstrap.ServerBootstrap;
83import org.jboss.netty.buffer.ChannelBuffer;
84import org.jboss.netty.buffer.ChannelBuffers;
85import org.jboss.netty.channel.Channel;
86import org.jboss.netty.channel.ChannelHandlerContext;
87import org.jboss.netty.channel.ChannelPipelineFactory;
88import org.jboss.netty.channel.ChannelStateEvent;
89import org.jboss.netty.channel.ChannelUpstreamHandler;
90import org.jboss.netty.channel.Channels;
91import org.jboss.netty.channel.ExceptionEvent;
92import org.jboss.netty.channel.MessageEvent;
93import org.jboss.netty.channel.group.ChannelGroup;
94import org.jboss.netty.channel.group.DefaultChannelGroup;
95import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
96import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
97import org.jboss.netty.handler.timeout.IdleStateEvent;
98import org.jboss.netty.handler.timeout.ReadTimeoutException;
99import org.openflow.protocol.OFEchoReply;
100import org.openflow.protocol.OFError;
101import org.openflow.protocol.OFError.OFBadActionCode;
102import org.openflow.protocol.OFError.OFBadRequestCode;
103import org.openflow.protocol.OFError.OFErrorType;
104import org.openflow.protocol.OFError.OFFlowModFailedCode;
105import org.openflow.protocol.OFError.OFHelloFailedCode;
106import org.openflow.protocol.OFError.OFPortModFailedCode;
107import org.openflow.protocol.OFError.OFQueueOpFailedCode;
108import org.openflow.protocol.OFFeaturesReply;
109import org.openflow.protocol.OFGetConfigReply;
110import org.openflow.protocol.OFMessage;
111import org.openflow.protocol.OFPacketIn;
112import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800113import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800115import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800116import org.openflow.protocol.OFPortStatus.OFPortReason;
117import org.openflow.protocol.OFSetConfig;
118import org.openflow.protocol.OFStatisticsRequest;
119import org.openflow.protocol.OFSwitchConfig;
120import org.openflow.protocol.OFType;
121import org.openflow.protocol.OFVendor;
122import org.openflow.protocol.factory.BasicFactory;
123import org.openflow.protocol.factory.MessageParseException;
124import org.openflow.protocol.statistics.OFDescriptionStatistics;
125import org.openflow.protocol.statistics.OFStatistics;
126import org.openflow.protocol.statistics.OFStatisticsType;
127import org.openflow.protocol.vendor.OFBasicVendorDataType;
128import org.openflow.protocol.vendor.OFBasicVendorId;
129import org.openflow.protocol.vendor.OFVendorId;
130import org.openflow.util.HexString;
131import org.openflow.util.U16;
132import org.openflow.util.U32;
133import org.openflow.vendor.nicira.OFNiciraVendorData;
134import org.openflow.vendor.nicira.OFRoleReplyVendorData;
135import org.openflow.vendor.nicira.OFRoleRequestVendorData;
136import org.openflow.vendor.nicira.OFRoleVendorData;
137import org.slf4j.Logger;
138import org.slf4j.LoggerFactory;
139
140
141/**
142 * The main controller class. Handles all setup and network listeners
143 */
144public class Controller implements IFloodlightProviderService,
145 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700146
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800147 protected static Logger log = LoggerFactory.getLogger(Controller.class);
148
149 private static final String ERROR_DATABASE =
150 "The controller could not communicate with the system database.";
151
152 protected BasicFactory factory;
153 protected ConcurrentMap<OFType,
154 ListenerDispatcher<OFType,IOFMessageListener>>
155 messageListeners;
156 // The activeSwitches map contains only those switches that are actively
157 // being controlled by us -- it doesn't contain switches that are
158 // in the slave role
159 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
160 // connectedSwitches contains all connected switches, including ones where
161 // we're a slave controller. We need to keep track of them so that we can
162 // send role request messages to switches when our role changes to master
163 // We add a switch to this set after it successfully completes the
164 // handshake. Access to this Set needs to be synchronized with roleChanger
165 protected HashSet<OFSwitchImpl> connectedSwitches;
166
167 // The controllerNodeIPsCache maps Controller IDs to their IP address.
168 // It's only used by handleControllerNodeIPsChanged
169 protected HashMap<String, String> controllerNodeIPsCache;
170
171 protected Set<IOFSwitchListener> switchListeners;
172 protected Set<IHAListener> haListeners;
173 protected Map<String, List<IInfoProvider>> providerMap;
174 protected BlockingQueue<IUpdate> updates;
175
176 // Module dependencies
177 protected IRestApiService restApi;
178 protected ICounterStoreService counterStore = null;
179 protected IStorageSourceService storageSource;
180 protected IPktInProcessingTimeService pktinProcTime;
181 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800182 protected IFlowService flowService;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800183 protected ITopoRouteService topoRouteService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800184 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800185
186 // Configuration options
187 protected int openFlowPort = 6633;
188 protected int workerThreads = 0;
189 // The id for this controller node. Should be unique for each controller
190 // node in a controller cluster.
191 protected String controllerId = "localhost";
192
193 // The current role of the controller.
194 // If the controller isn't configured to support roles, then this is null.
195 protected Role role;
196 // A helper that handles sending and timeout handling for role requests
197 protected RoleChanger roleChanger;
198
199 // Start time of the controller
200 protected long systemStartTime;
201
202 // Flag to always flush flow table on switch reconnect (HA or otherwise)
203 protected boolean alwaysClearFlowsOnSwAdd = false;
204
205 // Storage table names
206 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
207 protected static final String CONTROLLER_ID = "id";
208
209 protected static final String SWITCH_TABLE_NAME = "controller_switch";
210 protected static final String SWITCH_DATAPATH_ID = "dpid";
211 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
212 protected static final String SWITCH_IP = "ip";
213 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
214 protected static final String SWITCH_ACTIVE = "active";
215 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
216 protected static final String SWITCH_CAPABILITIES = "capabilities";
217 protected static final String SWITCH_BUFFERS = "buffers";
218 protected static final String SWITCH_TABLES = "tables";
219 protected static final String SWITCH_ACTIONS = "actions";
220
221 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
222 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
223
224 protected static final String PORT_TABLE_NAME = "controller_port";
225 protected static final String PORT_ID = "id";
226 protected static final String PORT_SWITCH = "switch_id";
227 protected static final String PORT_NUMBER = "number";
228 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
229 protected static final String PORT_NAME = "name";
230 protected static final String PORT_CONFIG = "config";
231 protected static final String PORT_STATE = "state";
232 protected static final String PORT_CURRENT_FEATURES = "current_features";
233 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
234 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
235 protected static final String PORT_PEER_FEATURES = "peer_features";
236
237 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
238 protected static final String CONTROLLER_INTERFACE_ID = "id";
239 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
240 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
241 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
242 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
243
244
245
246 // Perf. related configuration
247 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
248 protected static final int BATCH_MAX_SIZE = 100;
249 protected static final boolean ALWAYS_DECODE_ETH = true;
250
251 /**
252 * Updates handled by the main loop
253 */
254 protected interface IUpdate {
255 /**
256 * Calls the appropriate listeners
257 */
258 public void dispatch();
259 }
260 public enum SwitchUpdateType {
261 ADDED,
262 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700263 PORTCHANGED,
264 PORTADDED,
265 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800266 }
267 /**
268 * Update message indicating a switch was added or removed
269 */
270 protected class SwitchUpdate implements IUpdate {
271 public IOFSwitch sw;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700272 public OFPhysicalPort port;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800273 public SwitchUpdateType switchUpdateType;
274 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
275 this.sw = sw;
276 this.switchUpdateType = switchUpdateType;
277 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700278 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
279 this.sw = sw;
280 this.port = port;
281 this.switchUpdateType = switchUpdateType;
282 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800283 public void dispatch() {
284 if (log.isTraceEnabled()) {
285 log.trace("Dispatching switch update {} {}",
286 sw, switchUpdateType);
287 }
288 if (switchListeners != null) {
289 for (IOFSwitchListener listener : switchListeners) {
290 switch(switchUpdateType) {
291 case ADDED:
292 listener.addedSwitch(sw);
293 break;
294 case REMOVED:
295 listener.removedSwitch(sw);
296 break;
297 case PORTCHANGED:
298 listener.switchPortChanged(sw.getId());
299 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700300 case PORTADDED:
301 listener.switchPortAdded(sw.getId(), port);
302 break;
303 case PORTREMOVED:
304 listener.switchPortRemoved(sw.getId(), port);
305 break;
306 default:
307 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800308 }
309 }
310 }
311 }
312 }
313
314 /**
315 * Update message indicating controller's role has changed
316 */
317 protected class HARoleUpdate implements IUpdate {
318 public Role oldRole;
319 public Role newRole;
320 public HARoleUpdate(Role newRole, Role oldRole) {
321 this.oldRole = oldRole;
322 this.newRole = newRole;
323 }
324 public void dispatch() {
325 // Make sure that old and new roles are different.
326 if (oldRole == newRole) {
327 if (log.isTraceEnabled()) {
328 log.trace("HA role update ignored as the old and " +
329 "new roles are the same. newRole = {}" +
330 "oldRole = {}", newRole, oldRole);
331 }
332 return;
333 }
334 if (log.isTraceEnabled()) {
335 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
336 newRole, oldRole);
337 }
338 if (haListeners != null) {
339 for (IHAListener listener : haListeners) {
340 listener.roleChanged(oldRole, newRole);
341 }
342 }
343 }
344 }
345
346 /**
347 * Update message indicating
348 * IPs of controllers in controller cluster have changed.
349 */
350 protected class HAControllerNodeIPUpdate implements IUpdate {
351 public Map<String,String> curControllerNodeIPs;
352 public Map<String,String> addedControllerNodeIPs;
353 public Map<String,String> removedControllerNodeIPs;
354 public HAControllerNodeIPUpdate(
355 HashMap<String,String> curControllerNodeIPs,
356 HashMap<String,String> addedControllerNodeIPs,
357 HashMap<String,String> removedControllerNodeIPs) {
358 this.curControllerNodeIPs = curControllerNodeIPs;
359 this.addedControllerNodeIPs = addedControllerNodeIPs;
360 this.removedControllerNodeIPs = removedControllerNodeIPs;
361 }
362 public void dispatch() {
363 if (log.isTraceEnabled()) {
364 log.trace("Dispatching HA Controller Node IP update "
365 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
366 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
367 removedControllerNodeIPs }
368 );
369 }
370 if (haListeners != null) {
371 for (IHAListener listener: haListeners) {
372 listener.controllerNodeIPsChanged(curControllerNodeIPs,
373 addedControllerNodeIPs, removedControllerNodeIPs);
374 }
375 }
376 }
377 }
378
379 // ***************
380 // Getters/Setters
381 // ***************
382
383 public void setStorageSourceService(IStorageSourceService storageSource) {
384 this.storageSource = storageSource;
385 }
386
387 public void setCounterStore(ICounterStoreService counterStore) {
388 this.counterStore = counterStore;
389 }
390
391 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
392 this.pktinProcTime = pits;
393 }
394
395 public void setRestApiService(IRestApiService restApi) {
396 this.restApi = restApi;
397 }
398
399 public void setThreadPoolService(IThreadPoolService tp) {
400 this.threadPool = tp;
401 }
402
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800403 public void setFlowService(IFlowService serviceImpl) {
404 this.flowService = serviceImpl;
405 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800406
407 public void setTopoRouteService(ITopoRouteService serviceImpl) {
408 this.topoRouteService = serviceImpl;
409 }
Jonathan Hartc2e95ee2013-02-22 15:25:11 -0800410
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800411 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800412 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800413 }
414
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800415 @Override
416 public Role getRole() {
417 synchronized(roleChanger) {
418 return role;
419 }
420 }
421
422 @Override
423 public void setRole(Role role) {
424 if (role == null) throw new NullPointerException("Role can not be null.");
425 if (role == Role.MASTER && this.role == Role.SLAVE) {
426 // Reset db state to Inactive for all switches.
427 updateAllInactiveSwitchInfo();
428 }
429
430 // Need to synchronize to ensure a reliable ordering on role request
431 // messages send and to ensure the list of connected switches is stable
432 // RoleChanger will handle the actual sending of the message and
433 // timeout handling
434 // @see RoleChanger
435 synchronized(roleChanger) {
436 if (role.equals(this.role)) {
437 log.debug("Ignoring role change: role is already {}", role);
438 return;
439 }
440
441 Role oldRole = this.role;
442 this.role = role;
443
444 log.debug("Submitting role change request to role {}", role);
445 roleChanger.submitRequest(connectedSwitches, role);
446
447 // Enqueue an update for our listeners.
448 try {
449 this.updates.put(new HARoleUpdate(role, oldRole));
450 } catch (InterruptedException e) {
451 log.error("Failure adding update to queue", e);
452 }
453 }
454 }
455
456
457
458 // **********************
459 // ChannelUpstreamHandler
460 // **********************
461
462 /**
463 * Return a new channel handler for processing a switch connections
464 * @param state The channel state object for the connection
465 * @return the new channel handler
466 */
467 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
468 return new OFChannelHandler(state);
469 }
470
Jonathan Hartcc957a02013-02-26 10:39:04 -0800471 protected class RoleChangeCallback implements ControlChangeCallback {
472 @Override
473 public void controlChanged(long dpid, boolean hasControl) {
474 log.info("Role change callback for switch {}, hasControl {}",
475 HexString.toHexString(dpid), hasControl);
476
477 synchronized(roleChanger){
478 OFSwitchImpl sw = null;
479 for (OFSwitchImpl connectedSw : connectedSwitches){
480 if (connectedSw.getId() == dpid){
481 sw = connectedSw;
482 break;
483 }
484 }
485 if (sw == null){
486 log.warn("Switch {} not found in connected switches",
487 HexString.toHexString(dpid));
488 return;
489 }
490
491 Role role = null;
492
Pankaj Berde01939e92013-03-08 14:38:27 -0800493 /*
494 * issue #229
495 * Cannot rely on sw.getRole() as it can be behind due to pending
496 * role changes in the queue. Just submit it and late the RoleChanger
497 * handle duplicates.
498 */
499
500 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800501 role = Role.MASTER;
502 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800503 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800504 role = Role.SLAVE;
505 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800506
507 log.debug("Sending role request {} msg to {}", role, sw);
508 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
509 swList.add(sw);
510 roleChanger.submitRequest(swList, role);
511
Jonathan Hartcc957a02013-02-26 10:39:04 -0800512 }
513
514 }
515 }
516
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800517 /**
518 * Channel handler deals with the switch connection and dispatches
519 * switch messages to the appropriate locations.
520 * @author readams
521 */
522 protected class OFChannelHandler
523 extends IdleStateAwareChannelUpstreamHandler {
524 protected OFSwitchImpl sw;
525 protected OFChannelState state;
526
527 public OFChannelHandler(OFChannelState state) {
528 this.state = state;
529 }
530
531 @Override
532 @LogMessageDoc(message="New switch connection from {ip address}",
533 explanation="A new switch has connected from the " +
534 "specified IP address")
535 public void channelConnected(ChannelHandlerContext ctx,
536 ChannelStateEvent e) throws Exception {
537 log.info("New switch connection from {}",
538 e.getChannel().getRemoteAddress());
539
540 sw = new OFSwitchImpl();
541 sw.setChannel(e.getChannel());
542 sw.setFloodlightProvider(Controller.this);
543 sw.setThreadPoolService(threadPool);
544
545 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
546 msglist.add(factory.getMessage(OFType.HELLO));
547 e.getChannel().write(msglist);
548
549 }
550
551 @Override
552 @LogMessageDoc(message="Disconnected switch {switch information}",
553 explanation="The specified switch has disconnected.")
554 public void channelDisconnected(ChannelHandlerContext ctx,
555 ChannelStateEvent e) throws Exception {
556 if (sw != null && state.hsState == HandshakeState.READY) {
557 if (activeSwitches.containsKey(sw.getId())) {
558 // It's safe to call removeSwitch even though the map might
559 // not contain this particular switch but another with the
560 // same DPID
561 removeSwitch(sw);
562 }
563 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700564 if (controlRequested) {
565 registryService.releaseControl(sw.getId());
566 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800567 connectedSwitches.remove(sw);
568 }
569 sw.setConnected(false);
570 }
571 log.info("Disconnected switch {}", sw);
572 }
573
574 @Override
575 @LogMessageDocs({
576 @LogMessageDoc(level="ERROR",
577 message="Disconnecting switch {switch} due to read timeout",
578 explanation="The connected switch has failed to send any " +
579 "messages or respond to echo requests",
580 recommendation=LogMessageDoc.CHECK_SWITCH),
581 @LogMessageDoc(level="ERROR",
582 message="Disconnecting switch {switch}: failed to " +
583 "complete handshake",
584 explanation="The switch did not respond correctly " +
585 "to handshake messages",
586 recommendation=LogMessageDoc.CHECK_SWITCH),
587 @LogMessageDoc(level="ERROR",
588 message="Disconnecting switch {switch} due to IO Error: {}",
589 explanation="There was an error communicating with the switch",
590 recommendation=LogMessageDoc.CHECK_SWITCH),
591 @LogMessageDoc(level="ERROR",
592 message="Disconnecting switch {switch} due to switch " +
593 "state error: {error}",
594 explanation="The switch sent an unexpected message",
595 recommendation=LogMessageDoc.CHECK_SWITCH),
596 @LogMessageDoc(level="ERROR",
597 message="Disconnecting switch {switch} due to " +
598 "message parse failure",
599 explanation="Could not parse a message from the switch",
600 recommendation=LogMessageDoc.CHECK_SWITCH),
601 @LogMessageDoc(level="ERROR",
602 message="Terminating controller due to storage exception",
603 explanation=ERROR_DATABASE,
604 recommendation=LogMessageDoc.CHECK_CONTROLLER),
605 @LogMessageDoc(level="ERROR",
606 message="Could not process message: queue full",
607 explanation="OpenFlow messages are arriving faster than " +
608 " the controller can process them.",
609 recommendation=LogMessageDoc.CHECK_CONTROLLER),
610 @LogMessageDoc(level="ERROR",
611 message="Error while processing message " +
612 "from switch {switch} {cause}",
613 explanation="An error occurred processing the switch message",
614 recommendation=LogMessageDoc.GENERIC_ACTION)
615 })
616 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
617 throws Exception {
618 if (e.getCause() instanceof ReadTimeoutException) {
619 // switch timeout
620 log.error("Disconnecting switch {} due to read timeout", sw);
621 ctx.getChannel().close();
622 } else if (e.getCause() instanceof HandshakeTimeoutException) {
623 log.error("Disconnecting switch {}: failed to complete handshake",
624 sw);
625 ctx.getChannel().close();
626 } else if (e.getCause() instanceof ClosedChannelException) {
627 //log.warn("Channel for sw {} already closed", sw);
628 } else if (e.getCause() instanceof IOException) {
629 log.error("Disconnecting switch {} due to IO Error: {}",
630 sw, e.getCause().getMessage());
631 ctx.getChannel().close();
632 } else if (e.getCause() instanceof SwitchStateException) {
633 log.error("Disconnecting switch {} due to switch state error: {}",
634 sw, e.getCause().getMessage());
635 ctx.getChannel().close();
636 } else if (e.getCause() instanceof MessageParseException) {
637 log.error("Disconnecting switch " + sw +
638 " due to message parse failure",
639 e.getCause());
640 ctx.getChannel().close();
641 } else if (e.getCause() instanceof StorageException) {
642 log.error("Terminating controller due to storage exception",
643 e.getCause());
644 terminate();
645 } else if (e.getCause() instanceof RejectedExecutionException) {
646 log.warn("Could not process message: queue full");
647 } else {
648 log.error("Error while processing message from switch " + sw,
649 e.getCause());
650 ctx.getChannel().close();
651 }
652 }
653
654 @Override
655 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
656 throws Exception {
657 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
658 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
659 e.getChannel().write(msglist);
660 }
661
662 @Override
663 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
664 throws Exception {
665 if (e.getMessage() instanceof List) {
666 @SuppressWarnings("unchecked")
667 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
668
669 for (OFMessage ofm : msglist) {
670 try {
671 processOFMessage(ofm);
672 }
673 catch (Exception ex) {
674 // We are the last handler in the stream, so run the
675 // exception through the channel again by passing in
676 // ctx.getChannel().
677 Channels.fireExceptionCaught(ctx.getChannel(), ex);
678 }
679 }
680
681 // Flush all flow-mods/packet-out generated from this "train"
682 OFSwitchImpl.flush_all();
683 }
684 }
685
686 /**
687 * Process the request for the switch description
688 */
689 @LogMessageDoc(level="ERROR",
690 message="Exception in reading description " +
691 " during handshake {exception}",
692 explanation="Could not process the switch description string",
693 recommendation=LogMessageDoc.CHECK_SWITCH)
694 void processSwitchDescReply() {
695 try {
696 // Read description, if it has been updated
697 @SuppressWarnings("unchecked")
698 Future<List<OFStatistics>> desc_future =
699 (Future<List<OFStatistics>>)sw.
700 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
701 List<OFStatistics> values =
702 desc_future.get(0, TimeUnit.MILLISECONDS);
703 if (values != null) {
704 OFDescriptionStatistics description =
705 new OFDescriptionStatistics();
706 ChannelBuffer data =
707 ChannelBuffers.buffer(description.getLength());
708 for (OFStatistics f : values) {
709 f.writeTo(data);
710 description.readFrom(data);
711 break; // SHOULD be a list of length 1
712 }
713 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
714 description);
715 sw.setSwitchProperties(description);
716 data = null;
717
718 // At this time, also set other switch properties from storage
719 boolean is_core_switch = false;
720 IResultSet resultSet = null;
721 try {
722 String swid = sw.getStringId();
723 resultSet =
724 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
725 for (Iterator<IResultSet> it =
726 resultSet.iterator(); it.hasNext();) {
727 // In case of multiple rows, use the status
728 // in last row?
729 Map<String, Object> row = it.next().getRow();
730 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
731 if (log.isDebugEnabled()) {
732 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
733 "config for switch={}, is-core={}",
734 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
735 }
736 String ics =
737 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
738 is_core_switch = ics.equals("true");
739 }
740 }
741 }
742 finally {
743 if (resultSet != null)
744 resultSet.close();
745 }
746 if (is_core_switch) {
747 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
748 new Boolean(true));
749 }
750 }
751 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
752 state.hasDescription = true;
753 checkSwitchReady();
754 }
755 catch (InterruptedException ex) {
756 // Ignore
757 }
758 catch (TimeoutException ex) {
759 // Ignore
760 } catch (Exception ex) {
761 log.error("Exception in reading description " +
762 " during handshake", ex);
763 }
764 }
765
766 /**
767 * Send initial switch setup information that we need before adding
768 * the switch
769 * @throws IOException
770 */
771 void sendHelloConfiguration() throws IOException {
772 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700773 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800774 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
775 }
776
777 /**
778 * Send the configuration requests we can only do after we have
779 * the features reply
780 * @throws IOException
781 */
782 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700783 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800784 // Ensure we receive the full packet via PacketIn
785 OFSetConfig config = (OFSetConfig) factory
786 .getMessage(OFType.SET_CONFIG);
787 config.setMissSendLength((short) 0xffff)
788 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
789 sw.write(config, null);
790 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
791 null);
792
793 // Get Description to set switch-specific flags
794 OFStatisticsRequest req = new OFStatisticsRequest();
795 req.setStatisticType(OFStatisticsType.DESC);
796 req.setLengthU(req.getLengthU());
797 Future<List<OFStatistics>> dfuture =
798 sw.getStatistics(req);
799 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
800 dfuture);
801
802 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700803
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700804 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800805 protected void checkSwitchReady() {
806 if (state.hsState == HandshakeState.FEATURES_REPLY &&
807 state.hasDescription && state.hasGetConfigReply) {
808
809 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700810 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800811
812 synchronized(roleChanger) {
813 // We need to keep track of all of the switches that are connected
814 // to the controller, in any role, so that we can later send the
815 // role request messages when the controller role changes.
816 // We need to be synchronized while doing this: we must not
817 // send a another role request to the connectedSwitches until
818 // we were able to add this new switch to connectedSwitches
819 // *and* send the current role to the new switch.
820 connectedSwitches.add(sw);
821
822 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800823 //Put the switch in SLAVE mode until we know we have control
824 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
825 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
826 swList.add(sw);
827 roleChanger.submitRequest(swList, Role.SLAVE);
828
Jonathan Hartcc957a02013-02-26 10:39:04 -0800829 //Request control of the switch from the global registry
830 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700831 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800832 registryService.requestControl(sw.getId(),
833 new RoleChangeCallback());
834 } catch (RegistryException e) {
835 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700836 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800837 }
838
Jonathan Hart97801ac2013-02-26 14:29:16 -0800839
Jonathan Hartcc957a02013-02-26 10:39:04 -0800840
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800841 // Send a role request if role support is enabled for the controller
842 // This is a probe that we'll use to determine if the switch
843 // actually supports the role request message. If it does we'll
844 // get back a role reply message. If it doesn't we'll get back an
845 // OFError message.
846 // If role is MASTER we will promote switch to active
847 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800848 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800849 log.debug("This controller's role is {}, " +
850 "sending initial role request msg to {}",
851 role, sw);
852 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
853 swList.add(sw);
854 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800855 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800856 }
857 else {
858 // Role supported not enabled on controller (for now)
859 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800860 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800861 "not sending role request msg to {}",
862 role, sw);
863 // Need to clear FlowMods before we add the switch
864 // and dispatch updates otherwise we have a race condition.
865 sw.clearAllFlowMods();
866 addSwitch(sw);
867 state.firstRoleReplyReceived = true;
868 }
869 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700870 if (!controlRequested) {
871 // yield to allow other thread(s) to release control
872 try {
873 Thread.sleep(10);
874 } catch (InterruptedException e) {
875 // Ignore interruptions
876 }
877 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700878 log.debug("Closing {} because we weren't able to request control " +
879 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700880 sw.channel.close();
881 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800882 }
883 }
884
885 /* Handle a role reply message we received from the switch. Since
886 * netty serializes message dispatch we don't need to synchronize
887 * against other receive operations from the same switch, so no need
888 * to synchronize addSwitch(), removeSwitch() operations from the same
889 * connection.
890 * FIXME: However, when a switch with the same DPID connects we do
891 * need some synchronization. However, handling switches with same
892 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
893 * removedSwitch notification):1
894 *
895 */
896 @LogMessageDoc(level="ERROR",
897 message="Invalid role value in role reply message",
898 explanation="Was unable to set the HA role (master or slave) " +
899 "for the controller.",
900 recommendation=LogMessageDoc.CHECK_CONTROLLER)
901 protected void handleRoleReplyMessage(OFVendor vendorMessage,
902 OFRoleReplyVendorData roleReplyVendorData) {
903 // Map from the role code in the message to our role enum
904 int nxRole = roleReplyVendorData.getRole();
905 Role role = null;
906 switch (nxRole) {
907 case OFRoleVendorData.NX_ROLE_OTHER:
908 role = Role.EQUAL;
909 break;
910 case OFRoleVendorData.NX_ROLE_MASTER:
911 role = Role.MASTER;
912 break;
913 case OFRoleVendorData.NX_ROLE_SLAVE:
914 role = Role.SLAVE;
915 break;
916 default:
917 log.error("Invalid role value in role reply message");
918 sw.getChannel().close();
919 return;
920 }
921
922 log.debug("Handling role reply for role {} from {}. " +
923 "Controller's role is {} ",
924 new Object[] { role, sw, Controller.this.role}
925 );
926
927 sw.deliverRoleReply(vendorMessage.getXid(), role);
928
929 boolean isActive = activeSwitches.containsKey(sw.getId());
930 if (!isActive && sw.isActive()) {
931 // Transition from SLAVE to MASTER.
932
933 if (!state.firstRoleReplyReceived ||
934 getAlwaysClearFlowsOnSwAdd()) {
935 // This is the first role-reply message we receive from
936 // this switch or roles were disabled when the switch
937 // connected:
938 // Delete all pre-existing flows for new connections to
939 // the master
940 //
941 // FIXME: Need to think more about what the test should
942 // be for when we flush the flow-table? For example,
943 // if all the controllers are temporarily in the backup
944 // role (e.g. right after a failure of the master
945 // controller) at the point the switch connects, then
946 // all of the controllers will initially connect as
947 // backup controllers and not flush the flow-table.
948 // Then when one of them is promoted to master following
949 // the master controller election the flow-table
950 // will still not be flushed because that's treated as
951 // a failover event where we don't want to flush the
952 // flow-table. The end result would be that the flow
953 // table for a newly connected switch is never
954 // flushed. Not sure how to handle that case though...
955 sw.clearAllFlowMods();
956 log.debug("First role reply from master switch {}, " +
957 "clear FlowTable to active switch list",
958 HexString.toHexString(sw.getId()));
959 }
960
961 // Some switches don't seem to update us with port
962 // status messages while in slave role.
963 readSwitchPortStateFromStorage(sw);
964
965 // Only add the switch to the active switch list if
966 // we're not in the slave role. Note that if the role
967 // attribute is null, then that means that the switch
968 // doesn't support the role request messages, so in that
969 // case we're effectively in the EQUAL role and the
970 // switch should be included in the active switch list.
971 addSwitch(sw);
972 log.debug("Added master switch {} to active switch list",
973 HexString.toHexString(sw.getId()));
974
975 }
976 else if (isActive && !sw.isActive()) {
977 // Transition from MASTER to SLAVE: remove switch
978 // from active switch list.
979 log.debug("Removed slave switch {} from active switch" +
980 " list", HexString.toHexString(sw.getId()));
981 removeSwitch(sw);
982 }
983
984 // Indicate that we have received a role reply message.
985 state.firstRoleReplyReceived = true;
986 }
987
988 protected boolean handleVendorMessage(OFVendor vendorMessage) {
989 boolean shouldHandleMessage = false;
990 int vendor = vendorMessage.getVendor();
991 switch (vendor) {
992 case OFNiciraVendorData.NX_VENDOR_ID:
993 OFNiciraVendorData niciraVendorData =
994 (OFNiciraVendorData)vendorMessage.getVendorData();
995 int dataType = niciraVendorData.getDataType();
996 switch (dataType) {
997 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
998 OFRoleReplyVendorData roleReplyVendorData =
999 (OFRoleReplyVendorData) niciraVendorData;
1000 handleRoleReplyMessage(vendorMessage,
1001 roleReplyVendorData);
1002 break;
1003 default:
1004 log.warn("Unhandled Nicira VENDOR message; " +
1005 "data type = {}", dataType);
1006 break;
1007 }
1008 break;
1009 default:
1010 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1011 break;
1012 }
1013
1014 return shouldHandleMessage;
1015 }
1016
1017 /**
1018 * Dispatch an Openflow message from a switch to the appropriate
1019 * handler.
1020 * @param m The message to process
1021 * @throws IOException
1022 * @throws SwitchStateException
1023 */
1024 @LogMessageDocs({
1025 @LogMessageDoc(level="WARN",
1026 message="Config Reply from {switch} has " +
1027 "miss length set to {length}",
1028 explanation="The controller requires that the switch " +
1029 "use a miss length of 0xffff for correct " +
1030 "function",
1031 recommendation="Use a different switch to ensure " +
1032 "correct function"),
1033 @LogMessageDoc(level="WARN",
1034 message="Received ERROR from sw {switch} that "
1035 +"indicates roles are not supported "
1036 +"but we have received a valid "
1037 +"role reply earlier",
1038 explanation="The switch sent a confusing message to the" +
1039 "controller")
1040 })
1041 protected void processOFMessage(OFMessage m)
1042 throws IOException, SwitchStateException {
1043 boolean shouldHandleMessage = false;
1044
1045 switch (m.getType()) {
1046 case HELLO:
1047 if (log.isTraceEnabled())
1048 log.trace("HELLO from {}", sw);
1049
1050 if (state.hsState.equals(HandshakeState.START)) {
1051 state.hsState = HandshakeState.HELLO;
1052 sendHelloConfiguration();
1053 } else {
1054 throw new SwitchStateException("Unexpected HELLO from "
1055 + sw);
1056 }
1057 break;
1058 case ECHO_REQUEST:
1059 OFEchoReply reply =
1060 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1061 reply.setXid(m.getXid());
1062 sw.write(reply, null);
1063 break;
1064 case ECHO_REPLY:
1065 break;
1066 case FEATURES_REPLY:
1067 if (log.isTraceEnabled())
1068 log.trace("Features Reply from {}", sw);
1069
1070 sw.setFeaturesReply((OFFeaturesReply) m);
1071 if (state.hsState.equals(HandshakeState.HELLO)) {
1072 sendFeatureReplyConfiguration();
1073 state.hsState = HandshakeState.FEATURES_REPLY;
1074 // uncomment to enable "dumb" switches like cbench
1075 // state.hsState = HandshakeState.READY;
1076 // addSwitch(sw);
1077 } else {
1078 // return results to rest api caller
1079 sw.deliverOFFeaturesReply(m);
1080 // update database */
1081 updateActiveSwitchInfo(sw);
1082 }
1083 break;
1084 case GET_CONFIG_REPLY:
1085 if (log.isTraceEnabled())
1086 log.trace("Get config reply from {}", sw);
1087
1088 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1089 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1090 throw new SwitchStateException(em);
1091 }
1092 OFGetConfigReply cr = (OFGetConfigReply) m;
1093 if (cr.getMissSendLength() == (short)0xffff) {
1094 log.trace("Config Reply from {} confirms " +
1095 "miss length set to 0xffff", sw);
1096 } else {
1097 log.warn("Config Reply from {} has " +
1098 "miss length set to {}",
1099 sw, cr.getMissSendLength() & 0xffff);
1100 }
1101 state.hasGetConfigReply = true;
1102 checkSwitchReady();
1103 break;
1104 case VENDOR:
1105 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1106 break;
1107 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001108 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001109 // TODO: we need better error handling. Especially for
1110 // request/reply style message (stats, roles) we should have
1111 // a unified way to lookup the xid in the error message.
1112 // This will probable involve rewriting the way we handle
1113 // request/reply style messages.
1114 OFError error = (OFError) m;
1115 boolean shouldLogError = true;
1116 // TODO: should we check that firstRoleReplyReceived is false,
1117 // i.e., check only whether the first request fails?
1118 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1119 boolean isBadVendorError =
1120 (error.getErrorType() == OFError.OFErrorType.
1121 OFPET_BAD_REQUEST.getValue());
1122 // We expect to receive a bad vendor error when
1123 // we're connected to a switch that doesn't support
1124 // the Nicira vendor extensions (i.e. not OVS or
1125 // derived from OVS). By protocol, it should also be
1126 // BAD_VENDOR, but too many switch implementations
1127 // get it wrong and we can already check the xid()
1128 // so we can ignore the type with confidence that this
1129 // is not a spurious error
1130 shouldLogError = !isBadVendorError;
1131 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001132 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001133 if (state.firstRoleReplyReceived && (role != null)) {
1134 log.warn("Received ERROR from sw {} that "
1135 +"indicates roles are not supported "
1136 +"but we have received a valid "
1137 +"role reply earlier", sw);
1138 }
1139 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001140 Role requestedRole =
1141 sw.deliverRoleRequestNotSupported(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001142 synchronized(roleChanger) {
1143 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001144 //This will now never happen. The Controller's role
1145 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001146 // the switch doesn't understand role request
1147 // messages and the current controller role is
1148 // slave. We need to disconnect the switch.
1149 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001150 log.warn("Closing {} channel because controller's role " +
1151 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001152 sw.getChannel().close();
1153 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001154 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001155 log.debug("Adding switch {} because we got an error" +
1156 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001157 // Controller's role is master: add to
1158 // active
1159 // TODO: check if clearing flow table is
1160 // right choice here.
1161 // Need to clear FlowMods before we add the switch
1162 // and dispatch updates otherwise we have a race condition.
1163 // TODO: switch update is async. Won't we still have a potential
1164 // race condition?
1165 sw.clearAllFlowMods();
1166 addSwitch(sw);
1167 }
1168 }
1169 }
1170 else {
1171 // TODO: Is this the right thing to do if we receive
1172 // some other error besides a bad vendor error?
1173 // Presumably that means the switch did actually
1174 // understand the role request message, but there
1175 // was some other error from processing the message.
1176 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1177 // error code, but it doesn't look like the Nicira
1178 // role request has that. Should check OVS source
1179 // code to see if it's possible for any other errors
1180 // to be returned.
1181 // If we received an error the switch is not
1182 // in the correct role, so we need to disconnect it.
1183 // We could also resend the request but then we need to
1184 // check if there are other pending request in which
1185 // case we shouldn't resend. If we do resend we need
1186 // to make sure that the switch eventually accepts one
1187 // of our requests or disconnect the switch. This feels
1188 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001189 log.debug("Closing {} channel because we recieved an " +
1190 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001191 sw.getChannel().close();
1192 }
1193 }
1194 // Once we support OF 1.2, we'd add code to handle it here.
1195 //if (error.getXid() == state.ofRoleRequestXid) {
1196 //}
1197 if (shouldLogError)
1198 logError(sw, error);
1199 break;
1200 case STATS_REPLY:
1201 if (state.hsState.ordinal() <
1202 HandshakeState.FEATURES_REPLY.ordinal()) {
1203 String em = "Unexpected STATS_REPLY from " + sw;
1204 throw new SwitchStateException(em);
1205 }
1206 sw.deliverStatisticsReply(m);
1207 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1208 processSwitchDescReply();
1209 }
1210 break;
1211 case PORT_STATUS:
1212 // We want to update our port state info even if we're in
1213 // the slave role, but we only want to update storage if
1214 // we're the master (or equal).
1215 boolean updateStorage = state.hsState.
1216 equals(HandshakeState.READY) &&
1217 (sw.getRole() != Role.SLAVE);
1218 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1219 shouldHandleMessage = true;
1220 break;
1221
1222 default:
1223 shouldHandleMessage = true;
1224 break;
1225 }
1226
1227 if (shouldHandleMessage) {
1228 sw.getListenerReadLock().lock();
1229 try {
1230 if (sw.isConnected()) {
1231 if (!state.hsState.equals(HandshakeState.READY)) {
1232 log.debug("Ignoring message type {} received " +
1233 "from switch {} before switch is " +
1234 "fully configured.", m.getType(), sw);
1235 }
1236 // Check if the controller is in the slave role for the
1237 // switch. If it is, then don't dispatch the message to
1238 // the listeners.
1239 // TODO: Should we dispatch messages that we expect to
1240 // receive when we're in the slave role, e.g. port
1241 // status messages? Since we're "hiding" switches from
1242 // the listeners when we're in the slave role, then it
1243 // seems a little weird to dispatch port status messages
1244 // to them. On the other hand there might be special
1245 // modules that care about all of the connected switches
1246 // and would like to receive port status notifications.
1247 else if (sw.getRole() == Role.SLAVE) {
1248 // Don't log message if it's a port status message
1249 // since we expect to receive those from the switch
1250 // and don't want to emit spurious messages.
1251 if (m.getType() != OFType.PORT_STATUS) {
1252 log.debug("Ignoring message type {} received " +
1253 "from switch {} while in the slave role.",
1254 m.getType(), sw);
1255 }
1256 } else {
1257 handleMessage(sw, m, null);
1258 }
1259 }
1260 }
1261 finally {
1262 sw.getListenerReadLock().unlock();
1263 }
1264 }
1265 }
1266 }
1267
1268 // ****************
1269 // Message handlers
1270 // ****************
1271
1272 protected void handlePortStatusMessage(IOFSwitch sw,
1273 OFPortStatus m,
1274 boolean updateStorage) {
1275 short portNumber = m.getDesc().getPortNumber();
1276 OFPhysicalPort port = m.getDesc();
1277 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001278 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1279 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001280 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001281 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001282 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1283 try {
1284 this.updates.put(update);
1285 } catch (InterruptedException e) {
1286 log.error("Failure adding update to queue", e);
1287 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001288 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001289 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1290 try {
1291 this.updates.put(update);
1292 } catch (InterruptedException e) {
1293 log.error("Failure adding update to queue", e);
1294 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001295 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001296 if (updateStorage)
1297 updatePortInfo(sw, port);
1298 log.debug("Port #{} modified for {}", portNumber, sw);
1299 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1300 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001301 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1302 try {
1303 this.updates.put(update);
1304 } catch (InterruptedException e) {
1305 log.error("Failure adding update to queue", e);
1306 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001307 if (updateStorage)
1308 updatePortInfo(sw, port);
1309 log.debug("Port #{} added for {}", portNumber, sw);
1310 } else if (m.getReason() ==
1311 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1312 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001313 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1314 try {
1315 this.updates.put(update);
1316 } catch (InterruptedException e) {
1317 log.error("Failure adding update to queue", e);
1318 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001319 if (updateStorage)
1320 removePortInfo(sw, portNumber);
1321 log.debug("Port #{} deleted for {}", portNumber, sw);
1322 }
1323 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1324 try {
1325 this.updates.put(update);
1326 } catch (InterruptedException e) {
1327 log.error("Failure adding update to queue", e);
1328 }
1329 }
1330
1331 /**
1332 * flcontext_cache - Keep a thread local stack of contexts
1333 */
1334 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1335 new ThreadLocal <Stack<FloodlightContext>> () {
1336 @Override
1337 protected Stack<FloodlightContext> initialValue() {
1338 return new Stack<FloodlightContext>();
1339 }
1340 };
1341
1342 /**
1343 * flcontext_alloc - pop a context off the stack, if required create a new one
1344 * @return FloodlightContext
1345 */
1346 protected static FloodlightContext flcontext_alloc() {
1347 FloodlightContext flcontext = null;
1348
1349 if (flcontext_cache.get().empty()) {
1350 flcontext = new FloodlightContext();
1351 }
1352 else {
1353 flcontext = flcontext_cache.get().pop();
1354 }
1355
1356 return flcontext;
1357 }
1358
1359 /**
1360 * flcontext_free - Free the context to the current thread
1361 * @param flcontext
1362 */
1363 protected void flcontext_free(FloodlightContext flcontext) {
1364 flcontext.getStorage().clear();
1365 flcontext_cache.get().push(flcontext);
1366 }
1367
1368 /**
1369 * Handle replies to certain OFMessages, and pass others off to listeners
1370 * @param sw The switch for the message
1371 * @param m The message
1372 * @param bContext The floodlight context. If null then floodlight context would
1373 * be allocated in this function
1374 * @throws IOException
1375 */
1376 @LogMessageDocs({
1377 @LogMessageDoc(level="ERROR",
1378 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1379 " field is empty.",
1380 explanation="The switch sent an improperly-formatted PacketIn" +
1381 " message",
1382 recommendation=LogMessageDoc.CHECK_SWITCH),
1383 @LogMessageDoc(level="WARN",
1384 message="Unhandled OF Message: {} from {}",
1385 explanation="The switch sent a message not handled by " +
1386 "the controller")
1387 })
1388 protected void handleMessage(IOFSwitch sw, OFMessage m,
1389 FloodlightContext bContext)
1390 throws IOException {
1391 Ethernet eth = null;
1392
1393 switch (m.getType()) {
1394 case PACKET_IN:
1395 OFPacketIn pi = (OFPacketIn)m;
1396
1397 if (pi.getPacketData().length <= 0) {
1398 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1399 ") because the data field is empty.");
1400 return;
1401 }
1402
1403 if (Controller.ALWAYS_DECODE_ETH) {
1404 eth = new Ethernet();
1405 eth.deserialize(pi.getPacketData(), 0,
1406 pi.getPacketData().length);
1407 counterStore.updatePacketInCounters(sw, m, eth);
1408 }
1409 // fall through to default case...
1410
1411 default:
1412
1413 List<IOFMessageListener> listeners = null;
1414 if (messageListeners.containsKey(m.getType())) {
1415 listeners = messageListeners.get(m.getType()).
1416 getOrderedListeners();
1417 }
1418
1419 FloodlightContext bc = null;
1420 if (listeners != null) {
1421 // Check if floodlight context is passed from the calling
1422 // function, if so use that floodlight context, otherwise
1423 // allocate one
1424 if (bContext == null) {
1425 bc = flcontext_alloc();
1426 } else {
1427 bc = bContext;
1428 }
1429 if (eth != null) {
1430 IFloodlightProviderService.bcStore.put(bc,
1431 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1432 eth);
1433 }
1434
1435 // Get the starting time (overall and per-component) of
1436 // the processing chain for this packet if performance
1437 // monitoring is turned on
1438 pktinProcTime.bootstrap(listeners);
1439 pktinProcTime.recordStartTimePktIn();
1440 Command cmd;
1441 for (IOFMessageListener listener : listeners) {
1442 if (listener instanceof IOFSwitchFilter) {
1443 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1444 continue;
1445 }
1446 }
1447
1448 pktinProcTime.recordStartTimeComp(listener);
1449 cmd = listener.receive(sw, m, bc);
1450 pktinProcTime.recordEndTimeComp(listener);
1451
1452 if (Command.STOP.equals(cmd)) {
1453 break;
1454 }
1455 }
1456 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1457 } else {
1458 log.warn("Unhandled OF Message: {} from {}", m, sw);
1459 }
1460
1461 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1462 }
1463 }
1464
1465 /**
1466 * Log an OpenFlow error message from a switch
1467 * @param sw The switch that sent the error
1468 * @param error The error message
1469 */
1470 @LogMessageDoc(level="ERROR",
1471 message="Error {error type} {error code} from {switch}",
1472 explanation="The switch responded with an unexpected error" +
1473 "to an OpenFlow message from the controller",
1474 recommendation="This could indicate improper network operation. " +
1475 "If the problem persists restarting the switch and " +
1476 "controller may help."
1477 )
1478 protected void logError(IOFSwitch sw, OFError error) {
1479 int etint = 0xffff & error.getErrorType();
1480 if (etint < 0 || etint >= OFErrorType.values().length) {
1481 log.error("Unknown error code {} from sw {}", etint, sw);
1482 }
1483 OFErrorType et = OFErrorType.values()[etint];
1484 switch (et) {
1485 case OFPET_HELLO_FAILED:
1486 OFHelloFailedCode hfc =
1487 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1488 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1489 break;
1490 case OFPET_BAD_REQUEST:
1491 OFBadRequestCode brc =
1492 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1493 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1494 break;
1495 case OFPET_BAD_ACTION:
1496 OFBadActionCode bac =
1497 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1498 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1499 break;
1500 case OFPET_FLOW_MOD_FAILED:
1501 OFFlowModFailedCode fmfc =
1502 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1503 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1504 break;
1505 case OFPET_PORT_MOD_FAILED:
1506 OFPortModFailedCode pmfc =
1507 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1508 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1509 break;
1510 case OFPET_QUEUE_OP_FAILED:
1511 OFQueueOpFailedCode qofc =
1512 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1513 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1514 break;
1515 default:
1516 break;
1517 }
1518 }
1519
1520 /**
1521 * Add a switch to the active switch list and call the switch listeners.
1522 * This happens either when a switch first connects (and the controller is
1523 * not in the slave role) or when the role of the controller changes from
1524 * slave to master.
1525 * @param sw the switch that has been added
1526 */
1527 // TODO: need to rethink locking and the synchronous switch update.
1528 // We can / should also handle duplicate DPIDs in connectedSwitches
1529 @LogMessageDoc(level="ERROR",
1530 message="New switch added {switch} for already-added switch {switch}",
1531 explanation="A switch with the same DPID as another switch " +
1532 "connected to the controller. This can be caused by " +
1533 "multiple switches configured with the same DPID, or " +
1534 "by a switch reconnected very quickly after " +
1535 "disconnecting.",
1536 recommendation="If this happens repeatedly, it is likely there " +
1537 "are switches with duplicate DPIDs on the network. " +
1538 "Reconfigure the appropriate switches. If it happens " +
1539 "very rarely, then it is likely this is a transient " +
1540 "network problem that can be ignored."
1541 )
1542 protected void addSwitch(IOFSwitch sw) {
1543 // TODO: is it safe to modify the HashMap without holding
1544 // the old switch's lock?
1545 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1546 if (sw == oldSw) {
1547 // Note == for object equality, not .equals for value
1548 log.info("New add switch for pre-existing switch {}", sw);
1549 return;
1550 }
1551
1552 if (oldSw != null) {
1553 oldSw.getListenerWriteLock().lock();
1554 try {
1555 log.error("New switch added {} for already-added switch {}",
1556 sw, oldSw);
1557 // Set the connected flag to false to suppress calling
1558 // the listeners for this switch in processOFMessage
1559 oldSw.setConnected(false);
1560
1561 oldSw.cancelAllStatisticsReplies();
1562
1563 updateInactiveSwitchInfo(oldSw);
1564
1565 // we need to clean out old switch state definitively
1566 // before adding the new switch
1567 // FIXME: It seems not completely kosher to call the
1568 // switch listeners here. I thought one of the points of
1569 // having the asynchronous switch update mechanism was so
1570 // the addedSwitch and removedSwitch were always called
1571 // from a single thread to simplify concurrency issues
1572 // for the listener.
1573 if (switchListeners != null) {
1574 for (IOFSwitchListener listener : switchListeners) {
1575 listener.removedSwitch(oldSw);
1576 }
1577 }
1578 // will eventually trigger a removeSwitch(), which will cause
1579 // a "Not removing Switch ... already removed debug message.
1580 // TODO: Figure out a way to handle this that avoids the
1581 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001582 log.debug("Closing {} because a new IOFSwitch got added " +
1583 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001584 oldSw.getChannel().close();
1585 }
1586 finally {
1587 oldSw.getListenerWriteLock().unlock();
1588 }
1589 }
1590
1591 updateActiveSwitchInfo(sw);
1592 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1593 try {
1594 this.updates.put(update);
1595 } catch (InterruptedException e) {
1596 log.error("Failure adding update to queue", e);
1597 }
1598 }
1599
1600 /**
1601 * Remove a switch from the active switch list and call the switch listeners.
1602 * This happens either when the switch is disconnected or when the
1603 * controller's role for the switch changes from master to slave.
1604 * @param sw the switch that has been removed
1605 */
1606 protected void removeSwitch(IOFSwitch sw) {
1607 // No need to acquire the listener lock, since
1608 // this method is only called after netty has processed all
1609 // pending messages
1610 log.debug("removeSwitch: {}", sw);
1611 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1612 log.debug("Not removing switch {}; already removed", sw);
1613 return;
1614 }
1615 // We cancel all outstanding statistics replies if the switch transition
1616 // from active. In the future we might allow statistics requests
1617 // from slave controllers. Then we need to move this cancelation
1618 // to switch disconnect
1619 sw.cancelAllStatisticsReplies();
1620
1621 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1622 // here if role support is enabled. In that case if the switch is being
1623 // removed because we've been switched to being in the slave role, then I think
1624 // it's possible that the new master may have already been promoted to master
1625 // and written out the active switch state to storage. If we now execute
1626 // updateInactiveSwitchInfo we may wipe out all of the state that was
1627 // written out by the new master. Maybe need to revisit how we handle all
1628 // of the switch state that's written to storage.
1629
1630 updateInactiveSwitchInfo(sw);
1631 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1632 try {
1633 this.updates.put(update);
1634 } catch (InterruptedException e) {
1635 log.error("Failure adding update to queue", e);
1636 }
1637 }
1638
1639 // ***************
1640 // IFloodlightProvider
1641 // ***************
1642
1643 @Override
1644 public synchronized void addOFMessageListener(OFType type,
1645 IOFMessageListener listener) {
1646 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1647 messageListeners.get(type);
1648 if (ldd == null) {
1649 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1650 messageListeners.put(type, ldd);
1651 }
1652 ldd.addListener(type, listener);
1653 }
1654
1655 @Override
1656 public synchronized void removeOFMessageListener(OFType type,
1657 IOFMessageListener listener) {
1658 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1659 messageListeners.get(type);
1660 if (ldd != null) {
1661 ldd.removeListener(listener);
1662 }
1663 }
1664
1665 private void logListeners() {
1666 for (Map.Entry<OFType,
1667 ListenerDispatcher<OFType,
1668 IOFMessageListener>> entry
1669 : messageListeners.entrySet()) {
1670
1671 OFType type = entry.getKey();
1672 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1673 entry.getValue();
1674
1675 StringBuffer sb = new StringBuffer();
1676 sb.append("OFListeners for ");
1677 sb.append(type);
1678 sb.append(": ");
1679 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1680 sb.append(l.getName());
1681 sb.append(",");
1682 }
1683 log.debug(sb.toString());
1684 }
1685 }
1686
1687 public void removeOFMessageListeners(OFType type) {
1688 messageListeners.remove(type);
1689 }
1690
1691 @Override
1692 public Map<Long, IOFSwitch> getSwitches() {
1693 return Collections.unmodifiableMap(this.activeSwitches);
1694 }
1695
1696 @Override
1697 public void addOFSwitchListener(IOFSwitchListener listener) {
1698 this.switchListeners.add(listener);
1699 }
1700
1701 @Override
1702 public void removeOFSwitchListener(IOFSwitchListener listener) {
1703 this.switchListeners.remove(listener);
1704 }
1705
1706 @Override
1707 public Map<OFType, List<IOFMessageListener>> getListeners() {
1708 Map<OFType, List<IOFMessageListener>> lers =
1709 new HashMap<OFType, List<IOFMessageListener>>();
1710 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1711 messageListeners.entrySet()) {
1712 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1713 }
1714 return Collections.unmodifiableMap(lers);
1715 }
1716
1717 @Override
1718 @LogMessageDocs({
1719 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1720 "a null switch",
1721 explanation="Failed to process a message because the switch " +
1722 " is no longer connected."),
1723 @LogMessageDoc(level="ERROR",
1724 message="Error reinjecting OFMessage on switch {switch}",
1725 explanation="An I/O error occured while attempting to " +
1726 "process an OpenFlow message",
1727 recommendation=LogMessageDoc.CHECK_SWITCH)
1728 })
1729 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1730 FloodlightContext bc) {
1731 if (sw == null) {
1732 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1733 return false;
1734 }
1735
1736 // FIXME: Do we need to be able to inject messages to switches
1737 // where we're the slave controller (i.e. they're connected but
1738 // not active)?
1739 // FIXME: Don't we need synchronization logic here so we're holding
1740 // the listener read lock when we call handleMessage? After some
1741 // discussions it sounds like the right thing to do here would be to
1742 // inject the message as a netty upstream channel event so it goes
1743 // through the normal netty event processing, including being
1744 // handled
1745 if (!activeSwitches.containsKey(sw.getId())) return false;
1746
1747 try {
1748 // Pass Floodlight context to the handleMessages()
1749 handleMessage(sw, msg, bc);
1750 } catch (IOException e) {
1751 log.error("Error reinjecting OFMessage on switch {}",
1752 HexString.toHexString(sw.getId()));
1753 return false;
1754 }
1755 return true;
1756 }
1757
1758 @Override
1759 @LogMessageDoc(message="Calling System.exit",
1760 explanation="The controller is terminating")
1761 public synchronized void terminate() {
1762 log.info("Calling System.exit");
1763 System.exit(1);
1764 }
1765
1766 @Override
1767 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1768 // call the overloaded version with floodlight context set to null
1769 return injectOfMessage(sw, msg, null);
1770 }
1771
1772 @Override
1773 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1774 FloodlightContext bc) {
1775 if (log.isTraceEnabled()) {
1776 String str = OFMessage.getDataAsString(sw, m, bc);
1777 log.trace("{}", str);
1778 }
1779
1780 List<IOFMessageListener> listeners = null;
1781 if (messageListeners.containsKey(m.getType())) {
1782 listeners =
1783 messageListeners.get(m.getType()).getOrderedListeners();
1784 }
1785
1786 if (listeners != null) {
1787 for (IOFMessageListener listener : listeners) {
1788 if (listener instanceof IOFSwitchFilter) {
1789 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1790 continue;
1791 }
1792 }
1793 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1794 break;
1795 }
1796 }
1797 }
1798 }
1799
1800 @Override
1801 public BasicFactory getOFMessageFactory() {
1802 return factory;
1803 }
1804
1805 @Override
1806 public String getControllerId() {
1807 return controllerId;
1808 }
1809
1810 // **************
1811 // Initialization
1812 // **************
1813
1814 protected void updateAllInactiveSwitchInfo() {
1815 if (role == Role.SLAVE) {
1816 return;
1817 }
1818 String controllerId = getControllerId();
1819 String[] switchColumns = { SWITCH_DATAPATH_ID,
1820 SWITCH_CONTROLLER_ID,
1821 SWITCH_ACTIVE };
1822 String[] portColumns = { PORT_ID, PORT_SWITCH };
1823 IResultSet switchResultSet = null;
1824 try {
1825 OperatorPredicate op =
1826 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1827 OperatorPredicate.Operator.EQ,
1828 controllerId);
1829 switchResultSet =
1830 storageSource.executeQuery(SWITCH_TABLE_NAME,
1831 switchColumns,
1832 op, null);
1833 while (switchResultSet.next()) {
1834 IResultSet portResultSet = null;
1835 try {
1836 String datapathId =
1837 switchResultSet.getString(SWITCH_DATAPATH_ID);
1838 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1839 op = new OperatorPredicate(PORT_SWITCH,
1840 OperatorPredicate.Operator.EQ,
1841 datapathId);
1842 portResultSet =
1843 storageSource.executeQuery(PORT_TABLE_NAME,
1844 portColumns,
1845 op, null);
1846 while (portResultSet.next()) {
1847 portResultSet.deleteRow();
1848 }
1849 portResultSet.save();
1850 }
1851 finally {
1852 if (portResultSet != null)
1853 portResultSet.close();
1854 }
1855 }
1856 switchResultSet.save();
1857 }
1858 finally {
1859 if (switchResultSet != null)
1860 switchResultSet.close();
1861 }
1862 }
1863
1864 protected void updateControllerInfo() {
1865 updateAllInactiveSwitchInfo();
1866
1867 // Write out the controller info to the storage source
1868 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1869 String id = getControllerId();
1870 controllerInfo.put(CONTROLLER_ID, id);
1871 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1872 }
1873
1874 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1875 if (role == Role.SLAVE) {
1876 return;
1877 }
1878 // Obtain the row info for the switch
1879 Map<String, Object> switchInfo = new HashMap<String, Object>();
1880 String datapathIdString = sw.getStringId();
1881 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1882 String controllerId = getControllerId();
1883 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1884 Date connectedSince = sw.getConnectedSince();
1885 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1886 Channel channel = sw.getChannel();
1887 SocketAddress socketAddress = channel.getRemoteAddress();
1888 if (socketAddress != null) {
1889 String socketAddressString = socketAddress.toString();
1890 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1891 if (socketAddress instanceof InetSocketAddress) {
1892 InetSocketAddress inetSocketAddress =
1893 (InetSocketAddress)socketAddress;
1894 InetAddress inetAddress = inetSocketAddress.getAddress();
1895 String ip = inetAddress.getHostAddress();
1896 switchInfo.put(SWITCH_IP, ip);
1897 }
1898 }
1899
1900 // Write out the switch features info
1901 long capabilities = U32.f(sw.getCapabilities());
1902 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1903 long buffers = U32.f(sw.getBuffers());
1904 switchInfo.put(SWITCH_BUFFERS, buffers);
1905 long tables = U32.f(sw.getTables());
1906 switchInfo.put(SWITCH_TABLES, tables);
1907 long actions = U32.f(sw.getActions());
1908 switchInfo.put(SWITCH_ACTIONS, actions);
1909 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1910
1911 // Update the switch
1912 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1913
1914 // Update the ports
1915 for (OFPhysicalPort port: sw.getPorts()) {
1916 updatePortInfo(sw, port);
1917 }
1918 }
1919
1920 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1921 if (role == Role.SLAVE) {
1922 return;
1923 }
1924 log.debug("Update DB with inactiveSW {}", sw);
1925 // Update the controller info in the storage source to be inactive
1926 Map<String, Object> switchInfo = new HashMap<String, Object>();
1927 String datapathIdString = sw.getStringId();
1928 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1929 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1930 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1931 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1932 }
1933
1934 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1935 if (role == Role.SLAVE) {
1936 return;
1937 }
1938 String datapathIdString = sw.getStringId();
1939 Map<String, Object> portInfo = new HashMap<String, Object>();
1940 int portNumber = U16.f(port.getPortNumber());
1941 String id = datapathIdString + "|" + portNumber;
1942 portInfo.put(PORT_ID, id);
1943 portInfo.put(PORT_SWITCH, datapathIdString);
1944 portInfo.put(PORT_NUMBER, portNumber);
1945 byte[] hardwareAddress = port.getHardwareAddress();
1946 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1947 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1948 String name = port.getName();
1949 portInfo.put(PORT_NAME, name);
1950 long config = U32.f(port.getConfig());
1951 portInfo.put(PORT_CONFIG, config);
1952 long state = U32.f(port.getState());
1953 portInfo.put(PORT_STATE, state);
1954 long currentFeatures = U32.f(port.getCurrentFeatures());
1955 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1956 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1957 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1958 long supportedFeatures = U32.f(port.getSupportedFeatures());
1959 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1960 long peerFeatures = U32.f(port.getPeerFeatures());
1961 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1962 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1963 }
1964
1965 /**
1966 * Read switch port data from storage and write it into a switch object
1967 * @param sw the switch to update
1968 */
1969 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1970 OperatorPredicate op =
1971 new OperatorPredicate(PORT_SWITCH,
1972 OperatorPredicate.Operator.EQ,
1973 sw.getStringId());
1974 IResultSet portResultSet =
1975 storageSource.executeQuery(PORT_TABLE_NAME,
1976 null, op, null);
1977 //Map<Short, OFPhysicalPort> oldports =
1978 // new HashMap<Short, OFPhysicalPort>();
1979 //oldports.putAll(sw.getPorts());
1980
1981 while (portResultSet.next()) {
1982 try {
1983 OFPhysicalPort p = new OFPhysicalPort();
1984 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1985 p.setName(portResultSet.getString(PORT_NAME));
1986 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1987 p.setState((int)portResultSet.getLong(PORT_STATE));
1988 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1989 p.setHardwareAddress(HexString.fromHexString(portMac));
1990 p.setCurrentFeatures((int)portResultSet.
1991 getLong(PORT_CURRENT_FEATURES));
1992 p.setAdvertisedFeatures((int)portResultSet.
1993 getLong(PORT_ADVERTISED_FEATURES));
1994 p.setSupportedFeatures((int)portResultSet.
1995 getLong(PORT_SUPPORTED_FEATURES));
1996 p.setPeerFeatures((int)portResultSet.
1997 getLong(PORT_PEER_FEATURES));
1998 //oldports.remove(Short.valueOf(p.getPortNumber()));
1999 sw.setPort(p);
2000 } catch (NullPointerException e) {
2001 // ignore
2002 }
2003 }
2004 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2005 try {
2006 this.updates.put(update);
2007 } catch (InterruptedException e) {
2008 log.error("Failure adding update to queue", e);
2009 }
2010 }
2011
2012 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2013 if (role == Role.SLAVE) {
2014 return;
2015 }
2016 String datapathIdString = sw.getStringId();
2017 String id = datapathIdString + "|" + portNumber;
2018 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2019 }
2020
2021 /**
2022 * Sets the initial role based on properties in the config params.
2023 * It looks for two different properties.
2024 * If the "role" property is specified then the value should be
2025 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2026 * controller is set to the specified value. If the "role" property
2027 * is not specified then it looks next for the "role.path" property.
2028 * In this case the value should be the path to a property file in
2029 * the file system that contains a property called "floodlight.role"
2030 * which can be one of the values listed above for the "role" property.
2031 * The idea behind the "role.path" mechanism is that you have some
2032 * separate heartbeat and master controller election algorithm that
2033 * determines the role of the controller. When a role transition happens,
2034 * it updates the current role in the file specified by the "role.path"
2035 * file. Then if floodlight restarts for some reason it can get the
2036 * correct current role of the controller from the file.
2037 * @param configParams The config params for the FloodlightProvider service
2038 * @return A valid role if role information is specified in the
2039 * config params, otherwise null
2040 */
2041 @LogMessageDocs({
2042 @LogMessageDoc(message="Controller role set to {role}",
2043 explanation="Setting the initial HA role to "),
2044 @LogMessageDoc(level="ERROR",
2045 message="Invalid current role value: {role}",
2046 explanation="An invalid HA role value was read from the " +
2047 "properties file",
2048 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2049 })
2050 protected Role getInitialRole(Map<String, String> configParams) {
2051 Role role = null;
2052 String roleString = configParams.get("role");
2053 if (roleString == null) {
2054 String rolePath = configParams.get("rolepath");
2055 if (rolePath != null) {
2056 Properties properties = new Properties();
2057 try {
2058 properties.load(new FileInputStream(rolePath));
2059 roleString = properties.getProperty("floodlight.role");
2060 }
2061 catch (IOException exc) {
2062 // Don't treat it as an error if the file specified by the
2063 // rolepath property doesn't exist. This lets us enable the
2064 // HA mechanism by just creating/setting the floodlight.role
2065 // property in that file without having to modify the
2066 // floodlight properties.
2067 }
2068 }
2069 }
2070
2071 if (roleString != null) {
2072 // Canonicalize the string to the form used for the enum constants
2073 roleString = roleString.trim().toUpperCase();
2074 try {
2075 role = Role.valueOf(roleString);
2076 }
2077 catch (IllegalArgumentException exc) {
2078 log.error("Invalid current role value: {}", roleString);
2079 }
2080 }
2081
2082 log.info("Controller role set to {}", role);
2083
2084 return role;
2085 }
2086
2087 /**
2088 * Tell controller that we're ready to accept switches loop
2089 * @throws IOException
2090 */
2091 @LogMessageDocs({
2092 @LogMessageDoc(message="Listening for switch connections on {address}",
2093 explanation="The controller is ready and listening for new" +
2094 " switch connections"),
2095 @LogMessageDoc(message="Storage exception in controller " +
2096 "updates loop; terminating process",
2097 explanation=ERROR_DATABASE,
2098 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2099 @LogMessageDoc(level="ERROR",
2100 message="Exception in controller updates loop",
2101 explanation="Failed to dispatch controller event",
2102 recommendation=LogMessageDoc.GENERIC_ACTION)
2103 })
2104 public void run() {
2105 if (log.isDebugEnabled()) {
2106 logListeners();
2107 }
2108
2109 try {
2110 final ServerBootstrap bootstrap = createServerBootStrap();
2111
2112 bootstrap.setOption("reuseAddr", true);
2113 bootstrap.setOption("child.keepAlive", true);
2114 bootstrap.setOption("child.tcpNoDelay", true);
2115 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2116
2117 ChannelPipelineFactory pfact =
2118 new OpenflowPipelineFactory(this, null);
2119 bootstrap.setPipelineFactory(pfact);
2120 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2121 final ChannelGroup cg = new DefaultChannelGroup();
2122 cg.add(bootstrap.bind(sa));
2123
2124 log.info("Listening for switch connections on {}", sa);
2125 } catch (Exception e) {
2126 throw new RuntimeException(e);
2127 }
2128
2129 // main loop
2130 while (true) {
2131 try {
2132 IUpdate update = updates.take();
2133 update.dispatch();
2134 } catch (InterruptedException e) {
2135 return;
2136 } catch (StorageException e) {
2137 log.error("Storage exception in controller " +
2138 "updates loop; terminating process", e);
2139 return;
2140 } catch (Exception e) {
2141 log.error("Exception in controller updates loop", e);
2142 }
2143 }
2144 }
2145
2146 private ServerBootstrap createServerBootStrap() {
2147 if (workerThreads == 0) {
2148 return new ServerBootstrap(
2149 new NioServerSocketChannelFactory(
2150 Executors.newCachedThreadPool(),
2151 Executors.newCachedThreadPool()));
2152 } else {
2153 return new ServerBootstrap(
2154 new NioServerSocketChannelFactory(
2155 Executors.newCachedThreadPool(),
2156 Executors.newCachedThreadPool(), workerThreads));
2157 }
2158 }
2159
2160 public void setConfigParams(Map<String, String> configParams) {
2161 String ofPort = configParams.get("openflowport");
2162 if (ofPort != null) {
2163 this.openFlowPort = Integer.parseInt(ofPort);
2164 }
2165 log.debug("OpenFlow port set to {}", this.openFlowPort);
2166 String threads = configParams.get("workerthreads");
2167 if (threads != null) {
2168 this.workerThreads = Integer.parseInt(threads);
2169 }
2170 log.debug("Number of worker threads set to {}", this.workerThreads);
2171 String controllerId = configParams.get("controllerid");
2172 if (controllerId != null) {
2173 this.controllerId = controllerId;
2174 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002175 else {
2176 //Try to get the hostname of the machine and use that for controller ID
2177 try {
2178 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2179 this.controllerId = hostname;
2180 } catch (UnknownHostException e) {
2181 // Can't get hostname, we'll just use the default
2182 }
2183 }
2184
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002185 log.debug("ControllerId set to {}", this.controllerId);
2186 }
2187
2188 private void initVendorMessages() {
2189 // Configure openflowj to be able to parse the role request/reply
2190 // vendor messages.
2191 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2192 OFNiciraVendorData.NX_VENDOR_ID, 4);
2193 OFVendorId.registerVendorId(niciraVendorId);
2194 OFBasicVendorDataType roleRequestVendorData =
2195 new OFBasicVendorDataType(
2196 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2197 OFRoleRequestVendorData.getInstantiable());
2198 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2199 OFBasicVendorDataType roleReplyVendorData =
2200 new OFBasicVendorDataType(
2201 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2202 OFRoleReplyVendorData.getInstantiable());
2203 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2204 }
2205
2206 /**
2207 * Initialize internal data structures
2208 */
2209 public void init(Map<String, String> configParams) {
2210 // These data structures are initialized here because other
2211 // module's startUp() might be called before ours
2212 this.messageListeners =
2213 new ConcurrentHashMap<OFType,
2214 ListenerDispatcher<OFType,
2215 IOFMessageListener>>();
2216 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2217 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2218 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2219 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2220 this.controllerNodeIPsCache = new HashMap<String, String>();
2221 this.updates = new LinkedBlockingQueue<IUpdate>();
2222 this.factory = new BasicFactory();
2223 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2224 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002225 //this.role = getInitialRole(configParams);
2226 //Set the controller's role to MASTER so it always tries to do role requests.
2227 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002228 this.roleChanger = new RoleChanger();
2229 initVendorMessages();
2230 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002231 }
2232
2233 /**
2234 * Startup all of the controller's components
2235 */
2236 @LogMessageDoc(message="Waiting for storage source",
2237 explanation="The system database is not yet ready",
2238 recommendation="If this message persists, this indicates " +
2239 "that the system database has failed to start. " +
2240 LogMessageDoc.CHECK_CONTROLLER)
2241 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002242 try {
2243 registryService.registerController(controllerId);
2244 } catch (RegistryException e2) {
2245 log.warn("Registry service error: {}", e2.getMessage());
2246 }
2247
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002248 // Create the table names we use
2249 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2250 storageSource.createTable(SWITCH_TABLE_NAME, null);
2251 storageSource.createTable(PORT_TABLE_NAME, null);
2252 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2253 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2254 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2255 CONTROLLER_ID);
2256 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2257 SWITCH_DATAPATH_ID);
2258 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2259 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2260 CONTROLLER_INTERFACE_ID);
2261 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2262
2263 while (true) {
2264 try {
2265 updateControllerInfo();
2266 break;
2267 }
2268 catch (StorageException e) {
2269 log.info("Waiting for storage source");
2270 try {
2271 Thread.sleep(1000);
2272 } catch (InterruptedException e1) {
2273 }
2274 }
2275 }
2276
2277 // Add our REST API
2278 restApi.addRestletRoutable(new CoreWebRoutable());
2279 }
2280
2281 @Override
2282 public void addInfoProvider(String type, IInfoProvider provider) {
2283 if (!providerMap.containsKey(type)) {
2284 providerMap.put(type, new ArrayList<IInfoProvider>());
2285 }
2286 providerMap.get(type).add(provider);
2287 }
2288
2289 @Override
2290 public void removeInfoProvider(String type, IInfoProvider provider) {
2291 if (!providerMap.containsKey(type)) {
2292 log.debug("Provider type {} doesn't exist.", type);
2293 return;
2294 }
2295
2296 providerMap.get(type).remove(provider);
2297 }
2298
2299 public Map<String, Object> getControllerInfo(String type) {
2300 if (!providerMap.containsKey(type)) return null;
2301
2302 Map<String, Object> result = new LinkedHashMap<String, Object>();
2303 for (IInfoProvider provider : providerMap.get(type)) {
2304 result.putAll(provider.getInfo(type));
2305 }
2306
2307 return result;
2308 }
2309
2310 @Override
2311 public void addHAListener(IHAListener listener) {
2312 this.haListeners.add(listener);
2313 }
2314
2315 @Override
2316 public void removeHAListener(IHAListener listener) {
2317 this.haListeners.remove(listener);
2318 }
2319
2320
2321 /**
2322 * Handle changes to the controller nodes IPs and dispatch update.
2323 */
2324 @SuppressWarnings("unchecked")
2325 protected void handleControllerNodeIPChanges() {
2326 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2327 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2328 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2329 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2330 CONTROLLER_INTERFACE_TYPE,
2331 CONTROLLER_INTERFACE_NUMBER,
2332 CONTROLLER_INTERFACE_DISCOVERED_IP };
2333 synchronized(controllerNodeIPsCache) {
2334 // We currently assume that interface Ethernet0 is the relevant
2335 // controller interface. Might change.
2336 // We could (should?) implement this using
2337 // predicates, but creating the individual and compound predicate
2338 // seems more overhead then just checking every row. Particularly,
2339 // since the number of rows is small and changes infrequent
2340 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2341 colNames,null, null);
2342 while (res.next()) {
2343 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2344 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2345 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2346 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2347 String curIP = controllerNodeIPsCache.get(controllerID);
2348
2349 curControllerNodeIPs.put(controllerID, discoveredIP);
2350 if (curIP == null) {
2351 // new controller node IP
2352 addedControllerNodeIPs.put(controllerID, discoveredIP);
2353 }
2354 else if (curIP != discoveredIP) {
2355 // IP changed
2356 removedControllerNodeIPs.put(controllerID, curIP);
2357 addedControllerNodeIPs.put(controllerID, discoveredIP);
2358 }
2359 }
2360 }
2361 // Now figure out if rows have been deleted. We can't use the
2362 // rowKeys from rowsDeleted directly, since the tables primary
2363 // key is a compound that we can't disassemble
2364 Set<String> curEntries = curControllerNodeIPs.keySet();
2365 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2366 removedEntries.removeAll(curEntries);
2367 for (String removedControllerID : removedEntries)
2368 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2369 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2370 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2371 curControllerNodeIPs, addedControllerNodeIPs,
2372 removedControllerNodeIPs);
2373 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2374 try {
2375 this.updates.put(update);
2376 } catch (InterruptedException e) {
2377 log.error("Failure adding update to queue", e);
2378 }
2379 }
2380 }
2381 }
2382
2383 @Override
2384 public Map<String, String> getControllerNodeIPs() {
2385 // We return a copy of the mapping so we can guarantee that
2386 // the mapping return is the same as one that will be (or was)
2387 // dispatched to IHAListeners
2388 HashMap<String,String> retval = new HashMap<String,String>();
2389 synchronized(controllerNodeIPsCache) {
2390 retval.putAll(controllerNodeIPsCache);
2391 }
2392 return retval;
2393 }
2394
2395 @Override
2396 public void rowsModified(String tableName, Set<Object> rowKeys) {
2397 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2398 handleControllerNodeIPChanges();
2399 }
2400
2401 }
2402
2403 @Override
2404 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2405 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2406 handleControllerNodeIPChanges();
2407 }
2408 }
2409
2410 @Override
2411 public long getSystemStartTime() {
2412 return (this.systemStartTime);
2413 }
2414
2415 @Override
2416 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2417 this.alwaysClearFlowsOnSwAdd = value;
2418 }
2419
2420 public boolean getAlwaysClearFlowsOnSwAdd() {
2421 return this.alwaysClearFlowsOnSwAdd;
2422 }
2423}