blob: 9efbe7d721b5e990f5ceed59fe518c54ee5e3a3c [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
61import net.floodlightcontroller.core.annotations.LogMessageDoc;
62import net.floodlightcontroller.core.annotations.LogMessageDocs;
63import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
64import net.floodlightcontroller.core.util.ListenerDispatcher;
65import net.floodlightcontroller.core.web.CoreWebRoutable;
66import net.floodlightcontroller.counter.ICounterStoreService;
67import net.floodlightcontroller.packet.Ethernet;
68import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
69import net.floodlightcontroller.restserver.IRestApiService;
70import net.floodlightcontroller.storage.IResultSet;
71import net.floodlightcontroller.storage.IStorageSourceListener;
72import net.floodlightcontroller.storage.IStorageSourceService;
73import net.floodlightcontroller.storage.OperatorPredicate;
74import net.floodlightcontroller.storage.StorageException;
75import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta20514902013-06-12 11:24:16 -070076import net.onrc.onos.ofcontroller.core.INetMapStorage.DM_OPERATION;
77import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoRouteService;
78import net.onrc.onos.ofcontroller.core.ISwitchStorage.SwitchState;
HIGUCHI Yutaedf81d72013-06-12 12:06:57 -070079import net.onrc.onos.ofcontroller.flowcache.IFlowService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080080import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080081import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080082import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080083
84import org.jboss.netty.bootstrap.ServerBootstrap;
85import org.jboss.netty.buffer.ChannelBuffer;
86import org.jboss.netty.buffer.ChannelBuffers;
87import org.jboss.netty.channel.Channel;
88import org.jboss.netty.channel.ChannelHandlerContext;
89import org.jboss.netty.channel.ChannelPipelineFactory;
90import org.jboss.netty.channel.ChannelStateEvent;
91import org.jboss.netty.channel.ChannelUpstreamHandler;
92import org.jboss.netty.channel.Channels;
93import org.jboss.netty.channel.ExceptionEvent;
94import org.jboss.netty.channel.MessageEvent;
95import org.jboss.netty.channel.group.ChannelGroup;
96import org.jboss.netty.channel.group.DefaultChannelGroup;
97import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
98import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
99import org.jboss.netty.handler.timeout.IdleStateEvent;
100import org.jboss.netty.handler.timeout.ReadTimeoutException;
101import org.openflow.protocol.OFEchoReply;
102import org.openflow.protocol.OFError;
103import org.openflow.protocol.OFError.OFBadActionCode;
104import org.openflow.protocol.OFError.OFBadRequestCode;
105import org.openflow.protocol.OFError.OFErrorType;
106import org.openflow.protocol.OFError.OFFlowModFailedCode;
107import org.openflow.protocol.OFError.OFHelloFailedCode;
108import org.openflow.protocol.OFError.OFPortModFailedCode;
109import org.openflow.protocol.OFError.OFQueueOpFailedCode;
110import org.openflow.protocol.OFFeaturesReply;
111import org.openflow.protocol.OFGetConfigReply;
112import org.openflow.protocol.OFMessage;
113import org.openflow.protocol.OFPacketIn;
114import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800115import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800116import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800117import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800118import org.openflow.protocol.OFPortStatus.OFPortReason;
119import org.openflow.protocol.OFSetConfig;
120import org.openflow.protocol.OFStatisticsRequest;
121import org.openflow.protocol.OFSwitchConfig;
122import org.openflow.protocol.OFType;
123import org.openflow.protocol.OFVendor;
124import org.openflow.protocol.factory.BasicFactory;
125import org.openflow.protocol.factory.MessageParseException;
126import org.openflow.protocol.statistics.OFDescriptionStatistics;
127import org.openflow.protocol.statistics.OFStatistics;
128import org.openflow.protocol.statistics.OFStatisticsType;
129import org.openflow.protocol.vendor.OFBasicVendorDataType;
130import org.openflow.protocol.vendor.OFBasicVendorId;
131import org.openflow.protocol.vendor.OFVendorId;
132import org.openflow.util.HexString;
133import org.openflow.util.U16;
134import org.openflow.util.U32;
135import org.openflow.vendor.nicira.OFNiciraVendorData;
136import org.openflow.vendor.nicira.OFRoleReplyVendorData;
137import org.openflow.vendor.nicira.OFRoleRequestVendorData;
138import org.openflow.vendor.nicira.OFRoleVendorData;
139import org.slf4j.Logger;
140import org.slf4j.LoggerFactory;
141
142
143/**
144 * The main controller class. Handles all setup and network listeners
145 */
146public class Controller implements IFloodlightProviderService,
147 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700148
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800149 protected static Logger log = LoggerFactory.getLogger(Controller.class);
150
151 private static final String ERROR_DATABASE =
152 "The controller could not communicate with the system database.";
153
154 protected BasicFactory factory;
155 protected ConcurrentMap<OFType,
156 ListenerDispatcher<OFType,IOFMessageListener>>
157 messageListeners;
158 // The activeSwitches map contains only those switches that are actively
159 // being controlled by us -- it doesn't contain switches that are
160 // in the slave role
161 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
162 // connectedSwitches contains all connected switches, including ones where
163 // we're a slave controller. We need to keep track of them so that we can
164 // send role request messages to switches when our role changes to master
165 // We add a switch to this set after it successfully completes the
166 // handshake. Access to this Set needs to be synchronized with roleChanger
167 protected HashSet<OFSwitchImpl> connectedSwitches;
168
169 // The controllerNodeIPsCache maps Controller IDs to their IP address.
170 // It's only used by handleControllerNodeIPsChanged
171 protected HashMap<String, String> controllerNodeIPsCache;
172
173 protected Set<IOFSwitchListener> switchListeners;
174 protected Set<IHAListener> haListeners;
175 protected Map<String, List<IInfoProvider>> providerMap;
176 protected BlockingQueue<IUpdate> updates;
177
178 // Module dependencies
179 protected IRestApiService restApi;
180 protected ICounterStoreService counterStore = null;
181 protected IStorageSourceService storageSource;
182 protected IPktInProcessingTimeService pktinProcTime;
183 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800184 protected IFlowService flowService;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800185 protected ITopoRouteService topoRouteService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800186 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800187
188 // Configuration options
189 protected int openFlowPort = 6633;
190 protected int workerThreads = 0;
191 // The id for this controller node. Should be unique for each controller
192 // node in a controller cluster.
193 protected String controllerId = "localhost";
194
195 // The current role of the controller.
196 // If the controller isn't configured to support roles, then this is null.
197 protected Role role;
198 // A helper that handles sending and timeout handling for role requests
199 protected RoleChanger roleChanger;
200
201 // Start time of the controller
202 protected long systemStartTime;
203
204 // Flag to always flush flow table on switch reconnect (HA or otherwise)
205 protected boolean alwaysClearFlowsOnSwAdd = false;
206
207 // Storage table names
208 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
209 protected static final String CONTROLLER_ID = "id";
210
211 protected static final String SWITCH_TABLE_NAME = "controller_switch";
212 protected static final String SWITCH_DATAPATH_ID = "dpid";
213 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
214 protected static final String SWITCH_IP = "ip";
215 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
216 protected static final String SWITCH_ACTIVE = "active";
217 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
218 protected static final String SWITCH_CAPABILITIES = "capabilities";
219 protected static final String SWITCH_BUFFERS = "buffers";
220 protected static final String SWITCH_TABLES = "tables";
221 protected static final String SWITCH_ACTIONS = "actions";
222
223 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
224 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
225
226 protected static final String PORT_TABLE_NAME = "controller_port";
227 protected static final String PORT_ID = "id";
228 protected static final String PORT_SWITCH = "switch_id";
229 protected static final String PORT_NUMBER = "number";
230 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
231 protected static final String PORT_NAME = "name";
232 protected static final String PORT_CONFIG = "config";
233 protected static final String PORT_STATE = "state";
234 protected static final String PORT_CURRENT_FEATURES = "current_features";
235 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
236 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
237 protected static final String PORT_PEER_FEATURES = "peer_features";
238
239 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
240 protected static final String CONTROLLER_INTERFACE_ID = "id";
241 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
242 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
243 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
244 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
245
246
247
248 // Perf. related configuration
249 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
250 protected static final int BATCH_MAX_SIZE = 100;
251 protected static final boolean ALWAYS_DECODE_ETH = true;
252
253 /**
254 * Updates handled by the main loop
255 */
256 protected interface IUpdate {
257 /**
258 * Calls the appropriate listeners
259 */
260 public void dispatch();
261 }
262 public enum SwitchUpdateType {
263 ADDED,
264 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700265 PORTCHANGED,
266 PORTADDED,
267 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800268 }
269 /**
270 * Update message indicating a switch was added or removed
271 */
272 protected class SwitchUpdate implements IUpdate {
273 public IOFSwitch sw;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700274 public OFPhysicalPort port;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800275 public SwitchUpdateType switchUpdateType;
276 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
277 this.sw = sw;
278 this.switchUpdateType = switchUpdateType;
279 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700280 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
281 this.sw = sw;
282 this.port = port;
283 this.switchUpdateType = switchUpdateType;
284 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800285 public void dispatch() {
286 if (log.isTraceEnabled()) {
287 log.trace("Dispatching switch update {} {}",
288 sw, switchUpdateType);
289 }
290 if (switchListeners != null) {
291 for (IOFSwitchListener listener : switchListeners) {
292 switch(switchUpdateType) {
293 case ADDED:
294 listener.addedSwitch(sw);
295 break;
296 case REMOVED:
297 listener.removedSwitch(sw);
298 break;
299 case PORTCHANGED:
300 listener.switchPortChanged(sw.getId());
301 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700302 case PORTADDED:
303 listener.switchPortAdded(sw.getId(), port);
304 break;
305 case PORTREMOVED:
306 listener.switchPortRemoved(sw.getId(), port);
307 break;
308 default:
309 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800310 }
311 }
312 }
313 }
314 }
315
316 /**
317 * Update message indicating controller's role has changed
318 */
319 protected class HARoleUpdate implements IUpdate {
320 public Role oldRole;
321 public Role newRole;
322 public HARoleUpdate(Role newRole, Role oldRole) {
323 this.oldRole = oldRole;
324 this.newRole = newRole;
325 }
326 public void dispatch() {
327 // Make sure that old and new roles are different.
328 if (oldRole == newRole) {
329 if (log.isTraceEnabled()) {
330 log.trace("HA role update ignored as the old and " +
331 "new roles are the same. newRole = {}" +
332 "oldRole = {}", newRole, oldRole);
333 }
334 return;
335 }
336 if (log.isTraceEnabled()) {
337 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
338 newRole, oldRole);
339 }
340 if (haListeners != null) {
341 for (IHAListener listener : haListeners) {
342 listener.roleChanged(oldRole, newRole);
343 }
344 }
345 }
346 }
347
348 /**
349 * Update message indicating
350 * IPs of controllers in controller cluster have changed.
351 */
352 protected class HAControllerNodeIPUpdate implements IUpdate {
353 public Map<String,String> curControllerNodeIPs;
354 public Map<String,String> addedControllerNodeIPs;
355 public Map<String,String> removedControllerNodeIPs;
356 public HAControllerNodeIPUpdate(
357 HashMap<String,String> curControllerNodeIPs,
358 HashMap<String,String> addedControllerNodeIPs,
359 HashMap<String,String> removedControllerNodeIPs) {
360 this.curControllerNodeIPs = curControllerNodeIPs;
361 this.addedControllerNodeIPs = addedControllerNodeIPs;
362 this.removedControllerNodeIPs = removedControllerNodeIPs;
363 }
364 public void dispatch() {
365 if (log.isTraceEnabled()) {
366 log.trace("Dispatching HA Controller Node IP update "
367 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
368 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
369 removedControllerNodeIPs }
370 );
371 }
372 if (haListeners != null) {
373 for (IHAListener listener: haListeners) {
374 listener.controllerNodeIPsChanged(curControllerNodeIPs,
375 addedControllerNodeIPs, removedControllerNodeIPs);
376 }
377 }
378 }
379 }
380
381 // ***************
382 // Getters/Setters
383 // ***************
384
385 public void setStorageSourceService(IStorageSourceService storageSource) {
386 this.storageSource = storageSource;
387 }
388
389 public void setCounterStore(ICounterStoreService counterStore) {
390 this.counterStore = counterStore;
391 }
392
393 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
394 this.pktinProcTime = pits;
395 }
396
397 public void setRestApiService(IRestApiService restApi) {
398 this.restApi = restApi;
399 }
400
401 public void setThreadPoolService(IThreadPoolService tp) {
402 this.threadPool = tp;
403 }
404
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800405 public void setFlowService(IFlowService serviceImpl) {
406 this.flowService = serviceImpl;
407 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800408
409 public void setTopoRouteService(ITopoRouteService serviceImpl) {
410 this.topoRouteService = serviceImpl;
411 }
Jonathan Hartc2e95ee2013-02-22 15:25:11 -0800412
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800413 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800414 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800415 }
416
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800417 @Override
418 public Role getRole() {
419 synchronized(roleChanger) {
420 return role;
421 }
422 }
423
424 @Override
425 public void setRole(Role role) {
426 if (role == null) throw new NullPointerException("Role can not be null.");
427 if (role == Role.MASTER && this.role == Role.SLAVE) {
428 // Reset db state to Inactive for all switches.
429 updateAllInactiveSwitchInfo();
430 }
431
432 // Need to synchronize to ensure a reliable ordering on role request
433 // messages send and to ensure the list of connected switches is stable
434 // RoleChanger will handle the actual sending of the message and
435 // timeout handling
436 // @see RoleChanger
437 synchronized(roleChanger) {
438 if (role.equals(this.role)) {
439 log.debug("Ignoring role change: role is already {}", role);
440 return;
441 }
442
443 Role oldRole = this.role;
444 this.role = role;
445
446 log.debug("Submitting role change request to role {}", role);
447 roleChanger.submitRequest(connectedSwitches, role);
448
449 // Enqueue an update for our listeners.
450 try {
451 this.updates.put(new HARoleUpdate(role, oldRole));
452 } catch (InterruptedException e) {
453 log.error("Failure adding update to queue", e);
454 }
455 }
456 }
457
458
459
460 // **********************
461 // ChannelUpstreamHandler
462 // **********************
463
464 /**
465 * Return a new channel handler for processing a switch connections
466 * @param state The channel state object for the connection
467 * @return the new channel handler
468 */
469 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
470 return new OFChannelHandler(state);
471 }
472
Jonathan Hartcc957a02013-02-26 10:39:04 -0800473 protected class RoleChangeCallback implements ControlChangeCallback {
474 @Override
475 public void controlChanged(long dpid, boolean hasControl) {
476 log.info("Role change callback for switch {}, hasControl {}",
477 HexString.toHexString(dpid), hasControl);
478
479 synchronized(roleChanger){
480 OFSwitchImpl sw = null;
481 for (OFSwitchImpl connectedSw : connectedSwitches){
482 if (connectedSw.getId() == dpid){
483 sw = connectedSw;
484 break;
485 }
486 }
487 if (sw == null){
488 log.warn("Switch {} not found in connected switches",
489 HexString.toHexString(dpid));
490 return;
491 }
492
493 Role role = null;
494
Pankaj Berde01939e92013-03-08 14:38:27 -0800495 /*
496 * issue #229
497 * Cannot rely on sw.getRole() as it can be behind due to pending
498 * role changes in the queue. Just submit it and late the RoleChanger
499 * handle duplicates.
500 */
501
502 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800503 role = Role.MASTER;
504 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800505 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800506 role = Role.SLAVE;
507 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800508
509 log.debug("Sending role request {} msg to {}", role, sw);
510 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
511 swList.add(sw);
512 roleChanger.submitRequest(swList, role);
513
Jonathan Hartcc957a02013-02-26 10:39:04 -0800514 }
515
516 }
517 }
518
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800519 /**
520 * Channel handler deals with the switch connection and dispatches
521 * switch messages to the appropriate locations.
522 * @author readams
523 */
524 protected class OFChannelHandler
525 extends IdleStateAwareChannelUpstreamHandler {
526 protected OFSwitchImpl sw;
527 protected OFChannelState state;
528
529 public OFChannelHandler(OFChannelState state) {
530 this.state = state;
531 }
532
533 @Override
534 @LogMessageDoc(message="New switch connection from {ip address}",
535 explanation="A new switch has connected from the " +
536 "specified IP address")
537 public void channelConnected(ChannelHandlerContext ctx,
538 ChannelStateEvent e) throws Exception {
539 log.info("New switch connection from {}",
540 e.getChannel().getRemoteAddress());
541
542 sw = new OFSwitchImpl();
543 sw.setChannel(e.getChannel());
544 sw.setFloodlightProvider(Controller.this);
545 sw.setThreadPoolService(threadPool);
546
547 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
548 msglist.add(factory.getMessage(OFType.HELLO));
549 e.getChannel().write(msglist);
550
551 }
552
553 @Override
554 @LogMessageDoc(message="Disconnected switch {switch information}",
555 explanation="The specified switch has disconnected.")
556 public void channelDisconnected(ChannelHandlerContext ctx,
557 ChannelStateEvent e) throws Exception {
558 if (sw != null && state.hsState == HandshakeState.READY) {
559 if (activeSwitches.containsKey(sw.getId())) {
560 // It's safe to call removeSwitch even though the map might
561 // not contain this particular switch but another with the
562 // same DPID
563 removeSwitch(sw);
564 }
565 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700566 if (controlRequested) {
567 registryService.releaseControl(sw.getId());
568 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800569 connectedSwitches.remove(sw);
570 }
571 sw.setConnected(false);
572 }
573 log.info("Disconnected switch {}", sw);
574 }
575
576 @Override
577 @LogMessageDocs({
578 @LogMessageDoc(level="ERROR",
579 message="Disconnecting switch {switch} due to read timeout",
580 explanation="The connected switch has failed to send any " +
581 "messages or respond to echo requests",
582 recommendation=LogMessageDoc.CHECK_SWITCH),
583 @LogMessageDoc(level="ERROR",
584 message="Disconnecting switch {switch}: failed to " +
585 "complete handshake",
586 explanation="The switch did not respond correctly " +
587 "to handshake messages",
588 recommendation=LogMessageDoc.CHECK_SWITCH),
589 @LogMessageDoc(level="ERROR",
590 message="Disconnecting switch {switch} due to IO Error: {}",
591 explanation="There was an error communicating with the switch",
592 recommendation=LogMessageDoc.CHECK_SWITCH),
593 @LogMessageDoc(level="ERROR",
594 message="Disconnecting switch {switch} due to switch " +
595 "state error: {error}",
596 explanation="The switch sent an unexpected message",
597 recommendation=LogMessageDoc.CHECK_SWITCH),
598 @LogMessageDoc(level="ERROR",
599 message="Disconnecting switch {switch} due to " +
600 "message parse failure",
601 explanation="Could not parse a message from the switch",
602 recommendation=LogMessageDoc.CHECK_SWITCH),
603 @LogMessageDoc(level="ERROR",
604 message="Terminating controller due to storage exception",
605 explanation=ERROR_DATABASE,
606 recommendation=LogMessageDoc.CHECK_CONTROLLER),
607 @LogMessageDoc(level="ERROR",
608 message="Could not process message: queue full",
609 explanation="OpenFlow messages are arriving faster than " +
610 " the controller can process them.",
611 recommendation=LogMessageDoc.CHECK_CONTROLLER),
612 @LogMessageDoc(level="ERROR",
613 message="Error while processing message " +
614 "from switch {switch} {cause}",
615 explanation="An error occurred processing the switch message",
616 recommendation=LogMessageDoc.GENERIC_ACTION)
617 })
618 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
619 throws Exception {
620 if (e.getCause() instanceof ReadTimeoutException) {
621 // switch timeout
622 log.error("Disconnecting switch {} due to read timeout", sw);
623 ctx.getChannel().close();
624 } else if (e.getCause() instanceof HandshakeTimeoutException) {
625 log.error("Disconnecting switch {}: failed to complete handshake",
626 sw);
627 ctx.getChannel().close();
628 } else if (e.getCause() instanceof ClosedChannelException) {
629 //log.warn("Channel for sw {} already closed", sw);
630 } else if (e.getCause() instanceof IOException) {
631 log.error("Disconnecting switch {} due to IO Error: {}",
632 sw, e.getCause().getMessage());
633 ctx.getChannel().close();
634 } else if (e.getCause() instanceof SwitchStateException) {
635 log.error("Disconnecting switch {} due to switch state error: {}",
636 sw, e.getCause().getMessage());
637 ctx.getChannel().close();
638 } else if (e.getCause() instanceof MessageParseException) {
639 log.error("Disconnecting switch " + sw +
640 " due to message parse failure",
641 e.getCause());
642 ctx.getChannel().close();
643 } else if (e.getCause() instanceof StorageException) {
644 log.error("Terminating controller due to storage exception",
645 e.getCause());
646 terminate();
647 } else if (e.getCause() instanceof RejectedExecutionException) {
648 log.warn("Could not process message: queue full");
649 } else {
650 log.error("Error while processing message from switch " + sw,
651 e.getCause());
652 ctx.getChannel().close();
653 }
654 }
655
656 @Override
657 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
658 throws Exception {
659 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
660 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
661 e.getChannel().write(msglist);
662 }
663
664 @Override
665 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
666 throws Exception {
667 if (e.getMessage() instanceof List) {
668 @SuppressWarnings("unchecked")
669 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
670
671 for (OFMessage ofm : msglist) {
672 try {
673 processOFMessage(ofm);
674 }
675 catch (Exception ex) {
676 // We are the last handler in the stream, so run the
677 // exception through the channel again by passing in
678 // ctx.getChannel().
679 Channels.fireExceptionCaught(ctx.getChannel(), ex);
680 }
681 }
682
683 // Flush all flow-mods/packet-out generated from this "train"
684 OFSwitchImpl.flush_all();
685 }
686 }
687
688 /**
689 * Process the request for the switch description
690 */
691 @LogMessageDoc(level="ERROR",
692 message="Exception in reading description " +
693 " during handshake {exception}",
694 explanation="Could not process the switch description string",
695 recommendation=LogMessageDoc.CHECK_SWITCH)
696 void processSwitchDescReply() {
697 try {
698 // Read description, if it has been updated
699 @SuppressWarnings("unchecked")
700 Future<List<OFStatistics>> desc_future =
701 (Future<List<OFStatistics>>)sw.
702 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
703 List<OFStatistics> values =
704 desc_future.get(0, TimeUnit.MILLISECONDS);
705 if (values != null) {
706 OFDescriptionStatistics description =
707 new OFDescriptionStatistics();
708 ChannelBuffer data =
709 ChannelBuffers.buffer(description.getLength());
710 for (OFStatistics f : values) {
711 f.writeTo(data);
712 description.readFrom(data);
713 break; // SHOULD be a list of length 1
714 }
715 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
716 description);
717 sw.setSwitchProperties(description);
718 data = null;
719
720 // At this time, also set other switch properties from storage
721 boolean is_core_switch = false;
722 IResultSet resultSet = null;
723 try {
724 String swid = sw.getStringId();
725 resultSet =
726 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
727 for (Iterator<IResultSet> it =
728 resultSet.iterator(); it.hasNext();) {
729 // In case of multiple rows, use the status
730 // in last row?
731 Map<String, Object> row = it.next().getRow();
732 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
733 if (log.isDebugEnabled()) {
734 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
735 "config for switch={}, is-core={}",
736 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
737 }
738 String ics =
739 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
740 is_core_switch = ics.equals("true");
741 }
742 }
743 }
744 finally {
745 if (resultSet != null)
746 resultSet.close();
747 }
748 if (is_core_switch) {
749 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
750 new Boolean(true));
751 }
752 }
753 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
754 state.hasDescription = true;
755 checkSwitchReady();
756 }
757 catch (InterruptedException ex) {
758 // Ignore
759 }
760 catch (TimeoutException ex) {
761 // Ignore
762 } catch (Exception ex) {
763 log.error("Exception in reading description " +
764 " during handshake", ex);
765 }
766 }
767
768 /**
769 * Send initial switch setup information that we need before adding
770 * the switch
771 * @throws IOException
772 */
773 void sendHelloConfiguration() throws IOException {
774 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700775 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800776 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
777 }
778
779 /**
780 * Send the configuration requests we can only do after we have
781 * the features reply
782 * @throws IOException
783 */
784 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700785 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800786 // Ensure we receive the full packet via PacketIn
787 OFSetConfig config = (OFSetConfig) factory
788 .getMessage(OFType.SET_CONFIG);
789 config.setMissSendLength((short) 0xffff)
790 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
791 sw.write(config, null);
792 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
793 null);
794
795 // Get Description to set switch-specific flags
796 OFStatisticsRequest req = new OFStatisticsRequest();
797 req.setStatisticType(OFStatisticsType.DESC);
798 req.setLengthU(req.getLengthU());
799 Future<List<OFStatistics>> dfuture =
800 sw.getStatistics(req);
801 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
802 dfuture);
803
804 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700805
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700806 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800807 protected void checkSwitchReady() {
808 if (state.hsState == HandshakeState.FEATURES_REPLY &&
809 state.hasDescription && state.hasGetConfigReply) {
810
811 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700812 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800813
814 synchronized(roleChanger) {
815 // We need to keep track of all of the switches that are connected
816 // to the controller, in any role, so that we can later send the
817 // role request messages when the controller role changes.
818 // We need to be synchronized while doing this: we must not
819 // send a another role request to the connectedSwitches until
820 // we were able to add this new switch to connectedSwitches
821 // *and* send the current role to the new switch.
822 connectedSwitches.add(sw);
823
824 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800825 //Put the switch in SLAVE mode until we know we have control
826 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
827 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
828 swList.add(sw);
829 roleChanger.submitRequest(swList, Role.SLAVE);
830
Jonathan Hartcc957a02013-02-26 10:39:04 -0800831 //Request control of the switch from the global registry
832 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700833 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800834 registryService.requestControl(sw.getId(),
835 new RoleChangeCallback());
836 } catch (RegistryException e) {
837 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700838 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800839 }
840
Jonathan Hart97801ac2013-02-26 14:29:16 -0800841
Jonathan Hartcc957a02013-02-26 10:39:04 -0800842
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800843 // Send a role request if role support is enabled for the controller
844 // This is a probe that we'll use to determine if the switch
845 // actually supports the role request message. If it does we'll
846 // get back a role reply message. If it doesn't we'll get back an
847 // OFError message.
848 // If role is MASTER we will promote switch to active
849 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800850 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800851 log.debug("This controller's role is {}, " +
852 "sending initial role request msg to {}",
853 role, sw);
854 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
855 swList.add(sw);
856 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800857 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800858 }
859 else {
860 // Role supported not enabled on controller (for now)
861 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800862 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800863 "not sending role request msg to {}",
864 role, sw);
865 // Need to clear FlowMods before we add the switch
866 // and dispatch updates otherwise we have a race condition.
867 sw.clearAllFlowMods();
868 addSwitch(sw);
869 state.firstRoleReplyReceived = true;
870 }
871 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700872 if (!controlRequested) {
873 // yield to allow other thread(s) to release control
874 try {
875 Thread.sleep(10);
876 } catch (InterruptedException e) {
877 // Ignore interruptions
878 }
879 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700880 log.debug("Closing {} because we weren't able to request control " +
881 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700882 sw.channel.close();
883 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800884 }
885 }
886
887 /* Handle a role reply message we received from the switch. Since
888 * netty serializes message dispatch we don't need to synchronize
889 * against other receive operations from the same switch, so no need
890 * to synchronize addSwitch(), removeSwitch() operations from the same
891 * connection.
892 * FIXME: However, when a switch with the same DPID connects we do
893 * need some synchronization. However, handling switches with same
894 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
895 * removedSwitch notification):1
896 *
897 */
898 @LogMessageDoc(level="ERROR",
899 message="Invalid role value in role reply message",
900 explanation="Was unable to set the HA role (master or slave) " +
901 "for the controller.",
902 recommendation=LogMessageDoc.CHECK_CONTROLLER)
903 protected void handleRoleReplyMessage(OFVendor vendorMessage,
904 OFRoleReplyVendorData roleReplyVendorData) {
905 // Map from the role code in the message to our role enum
906 int nxRole = roleReplyVendorData.getRole();
907 Role role = null;
908 switch (nxRole) {
909 case OFRoleVendorData.NX_ROLE_OTHER:
910 role = Role.EQUAL;
911 break;
912 case OFRoleVendorData.NX_ROLE_MASTER:
913 role = Role.MASTER;
914 break;
915 case OFRoleVendorData.NX_ROLE_SLAVE:
916 role = Role.SLAVE;
917 break;
918 default:
919 log.error("Invalid role value in role reply message");
920 sw.getChannel().close();
921 return;
922 }
923
924 log.debug("Handling role reply for role {} from {}. " +
925 "Controller's role is {} ",
926 new Object[] { role, sw, Controller.this.role}
927 );
928
929 sw.deliverRoleReply(vendorMessage.getXid(), role);
930
931 boolean isActive = activeSwitches.containsKey(sw.getId());
932 if (!isActive && sw.isActive()) {
933 // Transition from SLAVE to MASTER.
934
935 if (!state.firstRoleReplyReceived ||
936 getAlwaysClearFlowsOnSwAdd()) {
937 // This is the first role-reply message we receive from
938 // this switch or roles were disabled when the switch
939 // connected:
940 // Delete all pre-existing flows for new connections to
941 // the master
942 //
943 // FIXME: Need to think more about what the test should
944 // be for when we flush the flow-table? For example,
945 // if all the controllers are temporarily in the backup
946 // role (e.g. right after a failure of the master
947 // controller) at the point the switch connects, then
948 // all of the controllers will initially connect as
949 // backup controllers and not flush the flow-table.
950 // Then when one of them is promoted to master following
951 // the master controller election the flow-table
952 // will still not be flushed because that's treated as
953 // a failover event where we don't want to flush the
954 // flow-table. The end result would be that the flow
955 // table for a newly connected switch is never
956 // flushed. Not sure how to handle that case though...
957 sw.clearAllFlowMods();
958 log.debug("First role reply from master switch {}, " +
959 "clear FlowTable to active switch list",
960 HexString.toHexString(sw.getId()));
961 }
962
963 // Some switches don't seem to update us with port
964 // status messages while in slave role.
965 readSwitchPortStateFromStorage(sw);
966
967 // Only add the switch to the active switch list if
968 // we're not in the slave role. Note that if the role
969 // attribute is null, then that means that the switch
970 // doesn't support the role request messages, so in that
971 // case we're effectively in the EQUAL role and the
972 // switch should be included in the active switch list.
973 addSwitch(sw);
974 log.debug("Added master switch {} to active switch list",
975 HexString.toHexString(sw.getId()));
976
977 }
978 else if (isActive && !sw.isActive()) {
979 // Transition from MASTER to SLAVE: remove switch
980 // from active switch list.
981 log.debug("Removed slave switch {} from active switch" +
982 " list", HexString.toHexString(sw.getId()));
983 removeSwitch(sw);
984 }
985
986 // Indicate that we have received a role reply message.
987 state.firstRoleReplyReceived = true;
988 }
989
990 protected boolean handleVendorMessage(OFVendor vendorMessage) {
991 boolean shouldHandleMessage = false;
992 int vendor = vendorMessage.getVendor();
993 switch (vendor) {
994 case OFNiciraVendorData.NX_VENDOR_ID:
995 OFNiciraVendorData niciraVendorData =
996 (OFNiciraVendorData)vendorMessage.getVendorData();
997 int dataType = niciraVendorData.getDataType();
998 switch (dataType) {
999 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
1000 OFRoleReplyVendorData roleReplyVendorData =
1001 (OFRoleReplyVendorData) niciraVendorData;
1002 handleRoleReplyMessage(vendorMessage,
1003 roleReplyVendorData);
1004 break;
1005 default:
1006 log.warn("Unhandled Nicira VENDOR message; " +
1007 "data type = {}", dataType);
1008 break;
1009 }
1010 break;
1011 default:
1012 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1013 break;
1014 }
1015
1016 return shouldHandleMessage;
1017 }
1018
1019 /**
1020 * Dispatch an Openflow message from a switch to the appropriate
1021 * handler.
1022 * @param m The message to process
1023 * @throws IOException
1024 * @throws SwitchStateException
1025 */
1026 @LogMessageDocs({
1027 @LogMessageDoc(level="WARN",
1028 message="Config Reply from {switch} has " +
1029 "miss length set to {length}",
1030 explanation="The controller requires that the switch " +
1031 "use a miss length of 0xffff for correct " +
1032 "function",
1033 recommendation="Use a different switch to ensure " +
1034 "correct function"),
1035 @LogMessageDoc(level="WARN",
1036 message="Received ERROR from sw {switch} that "
1037 +"indicates roles are not supported "
1038 +"but we have received a valid "
1039 +"role reply earlier",
1040 explanation="The switch sent a confusing message to the" +
1041 "controller")
1042 })
1043 protected void processOFMessage(OFMessage m)
1044 throws IOException, SwitchStateException {
1045 boolean shouldHandleMessage = false;
1046
1047 switch (m.getType()) {
1048 case HELLO:
1049 if (log.isTraceEnabled())
1050 log.trace("HELLO from {}", sw);
1051
1052 if (state.hsState.equals(HandshakeState.START)) {
1053 state.hsState = HandshakeState.HELLO;
1054 sendHelloConfiguration();
1055 } else {
1056 throw new SwitchStateException("Unexpected HELLO from "
1057 + sw);
1058 }
1059 break;
1060 case ECHO_REQUEST:
1061 OFEchoReply reply =
1062 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1063 reply.setXid(m.getXid());
1064 sw.write(reply, null);
1065 break;
1066 case ECHO_REPLY:
1067 break;
1068 case FEATURES_REPLY:
1069 if (log.isTraceEnabled())
1070 log.trace("Features Reply from {}", sw);
1071
1072 sw.setFeaturesReply((OFFeaturesReply) m);
1073 if (state.hsState.equals(HandshakeState.HELLO)) {
1074 sendFeatureReplyConfiguration();
1075 state.hsState = HandshakeState.FEATURES_REPLY;
1076 // uncomment to enable "dumb" switches like cbench
1077 // state.hsState = HandshakeState.READY;
1078 // addSwitch(sw);
1079 } else {
1080 // return results to rest api caller
1081 sw.deliverOFFeaturesReply(m);
1082 // update database */
1083 updateActiveSwitchInfo(sw);
1084 }
1085 break;
1086 case GET_CONFIG_REPLY:
1087 if (log.isTraceEnabled())
1088 log.trace("Get config reply from {}", sw);
1089
1090 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1091 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1092 throw new SwitchStateException(em);
1093 }
1094 OFGetConfigReply cr = (OFGetConfigReply) m;
1095 if (cr.getMissSendLength() == (short)0xffff) {
1096 log.trace("Config Reply from {} confirms " +
1097 "miss length set to 0xffff", sw);
1098 } else {
1099 log.warn("Config Reply from {} has " +
1100 "miss length set to {}",
1101 sw, cr.getMissSendLength() & 0xffff);
1102 }
1103 state.hasGetConfigReply = true;
1104 checkSwitchReady();
1105 break;
1106 case VENDOR:
1107 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1108 break;
1109 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001110 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001111 // TODO: we need better error handling. Especially for
1112 // request/reply style message (stats, roles) we should have
1113 // a unified way to lookup the xid in the error message.
1114 // This will probable involve rewriting the way we handle
1115 // request/reply style messages.
1116 OFError error = (OFError) m;
1117 boolean shouldLogError = true;
1118 // TODO: should we check that firstRoleReplyReceived is false,
1119 // i.e., check only whether the first request fails?
1120 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1121 boolean isBadVendorError =
1122 (error.getErrorType() == OFError.OFErrorType.
1123 OFPET_BAD_REQUEST.getValue());
1124 // We expect to receive a bad vendor error when
1125 // we're connected to a switch that doesn't support
1126 // the Nicira vendor extensions (i.e. not OVS or
1127 // derived from OVS). By protocol, it should also be
1128 // BAD_VENDOR, but too many switch implementations
1129 // get it wrong and we can already check the xid()
1130 // so we can ignore the type with confidence that this
1131 // is not a spurious error
1132 shouldLogError = !isBadVendorError;
1133 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001134 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001135 if (state.firstRoleReplyReceived && (role != null)) {
1136 log.warn("Received ERROR from sw {} that "
1137 +"indicates roles are not supported "
1138 +"but we have received a valid "
1139 +"role reply earlier", sw);
1140 }
1141 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001142 Role requestedRole =
1143 sw.deliverRoleRequestNotSupported(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001144 synchronized(roleChanger) {
1145 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001146 //This will now never happen. The Controller's role
1147 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001148 // the switch doesn't understand role request
1149 // messages and the current controller role is
1150 // slave. We need to disconnect the switch.
1151 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001152 log.warn("Closing {} channel because controller's role " +
1153 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001154 sw.getChannel().close();
1155 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001156 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001157 log.debug("Adding switch {} because we got an error" +
1158 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001159 // Controller's role is master: add to
1160 // active
1161 // TODO: check if clearing flow table is
1162 // right choice here.
1163 // Need to clear FlowMods before we add the switch
1164 // and dispatch updates otherwise we have a race condition.
1165 // TODO: switch update is async. Won't we still have a potential
1166 // race condition?
1167 sw.clearAllFlowMods();
1168 addSwitch(sw);
1169 }
1170 }
1171 }
1172 else {
1173 // TODO: Is this the right thing to do if we receive
1174 // some other error besides a bad vendor error?
1175 // Presumably that means the switch did actually
1176 // understand the role request message, but there
1177 // was some other error from processing the message.
1178 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1179 // error code, but it doesn't look like the Nicira
1180 // role request has that. Should check OVS source
1181 // code to see if it's possible for any other errors
1182 // to be returned.
1183 // If we received an error the switch is not
1184 // in the correct role, so we need to disconnect it.
1185 // We could also resend the request but then we need to
1186 // check if there are other pending request in which
1187 // case we shouldn't resend. If we do resend we need
1188 // to make sure that the switch eventually accepts one
1189 // of our requests or disconnect the switch. This feels
1190 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001191 log.debug("Closing {} channel because we recieved an " +
1192 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001193 sw.getChannel().close();
1194 }
1195 }
1196 // Once we support OF 1.2, we'd add code to handle it here.
1197 //if (error.getXid() == state.ofRoleRequestXid) {
1198 //}
1199 if (shouldLogError)
1200 logError(sw, error);
1201 break;
1202 case STATS_REPLY:
1203 if (state.hsState.ordinal() <
1204 HandshakeState.FEATURES_REPLY.ordinal()) {
1205 String em = "Unexpected STATS_REPLY from " + sw;
1206 throw new SwitchStateException(em);
1207 }
1208 sw.deliverStatisticsReply(m);
1209 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1210 processSwitchDescReply();
1211 }
1212 break;
1213 case PORT_STATUS:
1214 // We want to update our port state info even if we're in
1215 // the slave role, but we only want to update storage if
1216 // we're the master (or equal).
1217 boolean updateStorage = state.hsState.
1218 equals(HandshakeState.READY) &&
1219 (sw.getRole() != Role.SLAVE);
1220 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1221 shouldHandleMessage = true;
1222 break;
1223
1224 default:
1225 shouldHandleMessage = true;
1226 break;
1227 }
1228
1229 if (shouldHandleMessage) {
1230 sw.getListenerReadLock().lock();
1231 try {
1232 if (sw.isConnected()) {
1233 if (!state.hsState.equals(HandshakeState.READY)) {
1234 log.debug("Ignoring message type {} received " +
1235 "from switch {} before switch is " +
1236 "fully configured.", m.getType(), sw);
1237 }
1238 // Check if the controller is in the slave role for the
1239 // switch. If it is, then don't dispatch the message to
1240 // the listeners.
1241 // TODO: Should we dispatch messages that we expect to
1242 // receive when we're in the slave role, e.g. port
1243 // status messages? Since we're "hiding" switches from
1244 // the listeners when we're in the slave role, then it
1245 // seems a little weird to dispatch port status messages
1246 // to them. On the other hand there might be special
1247 // modules that care about all of the connected switches
1248 // and would like to receive port status notifications.
1249 else if (sw.getRole() == Role.SLAVE) {
1250 // Don't log message if it's a port status message
1251 // since we expect to receive those from the switch
1252 // and don't want to emit spurious messages.
1253 if (m.getType() != OFType.PORT_STATUS) {
1254 log.debug("Ignoring message type {} received " +
1255 "from switch {} while in the slave role.",
1256 m.getType(), sw);
1257 }
1258 } else {
1259 handleMessage(sw, m, null);
1260 }
1261 }
1262 }
1263 finally {
1264 sw.getListenerReadLock().unlock();
1265 }
1266 }
1267 }
1268 }
1269
1270 // ****************
1271 // Message handlers
1272 // ****************
1273
1274 protected void handlePortStatusMessage(IOFSwitch sw,
1275 OFPortStatus m,
1276 boolean updateStorage) {
1277 short portNumber = m.getDesc().getPortNumber();
1278 OFPhysicalPort port = m.getDesc();
1279 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001280 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1281 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001282 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001283 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001284 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1285 try {
1286 this.updates.put(update);
1287 } catch (InterruptedException e) {
1288 log.error("Failure adding update to queue", e);
1289 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001290 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001291 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1292 try {
1293 this.updates.put(update);
1294 } catch (InterruptedException e) {
1295 log.error("Failure adding update to queue", e);
1296 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001297 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001298 if (updateStorage)
1299 updatePortInfo(sw, port);
1300 log.debug("Port #{} modified for {}", portNumber, sw);
1301 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1302 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001303 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1304 try {
1305 this.updates.put(update);
1306 } catch (InterruptedException e) {
1307 log.error("Failure adding update to queue", e);
1308 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001309 if (updateStorage)
1310 updatePortInfo(sw, port);
1311 log.debug("Port #{} added for {}", portNumber, sw);
1312 } else if (m.getReason() ==
1313 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1314 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001315 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1316 try {
1317 this.updates.put(update);
1318 } catch (InterruptedException e) {
1319 log.error("Failure adding update to queue", e);
1320 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001321 if (updateStorage)
1322 removePortInfo(sw, portNumber);
1323 log.debug("Port #{} deleted for {}", portNumber, sw);
1324 }
1325 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1326 try {
1327 this.updates.put(update);
1328 } catch (InterruptedException e) {
1329 log.error("Failure adding update to queue", e);
1330 }
1331 }
1332
1333 /**
1334 * flcontext_cache - Keep a thread local stack of contexts
1335 */
1336 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1337 new ThreadLocal <Stack<FloodlightContext>> () {
1338 @Override
1339 protected Stack<FloodlightContext> initialValue() {
1340 return new Stack<FloodlightContext>();
1341 }
1342 };
1343
1344 /**
1345 * flcontext_alloc - pop a context off the stack, if required create a new one
1346 * @return FloodlightContext
1347 */
1348 protected static FloodlightContext flcontext_alloc() {
1349 FloodlightContext flcontext = null;
1350
1351 if (flcontext_cache.get().empty()) {
1352 flcontext = new FloodlightContext();
1353 }
1354 else {
1355 flcontext = flcontext_cache.get().pop();
1356 }
1357
1358 return flcontext;
1359 }
1360
1361 /**
1362 * flcontext_free - Free the context to the current thread
1363 * @param flcontext
1364 */
1365 protected void flcontext_free(FloodlightContext flcontext) {
1366 flcontext.getStorage().clear();
1367 flcontext_cache.get().push(flcontext);
1368 }
1369
1370 /**
1371 * Handle replies to certain OFMessages, and pass others off to listeners
1372 * @param sw The switch for the message
1373 * @param m The message
1374 * @param bContext The floodlight context. If null then floodlight context would
1375 * be allocated in this function
1376 * @throws IOException
1377 */
1378 @LogMessageDocs({
1379 @LogMessageDoc(level="ERROR",
1380 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1381 " field is empty.",
1382 explanation="The switch sent an improperly-formatted PacketIn" +
1383 " message",
1384 recommendation=LogMessageDoc.CHECK_SWITCH),
1385 @LogMessageDoc(level="WARN",
1386 message="Unhandled OF Message: {} from {}",
1387 explanation="The switch sent a message not handled by " +
1388 "the controller")
1389 })
1390 protected void handleMessage(IOFSwitch sw, OFMessage m,
1391 FloodlightContext bContext)
1392 throws IOException {
1393 Ethernet eth = null;
1394
1395 switch (m.getType()) {
1396 case PACKET_IN:
1397 OFPacketIn pi = (OFPacketIn)m;
1398
1399 if (pi.getPacketData().length <= 0) {
1400 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1401 ") because the data field is empty.");
1402 return;
1403 }
1404
1405 if (Controller.ALWAYS_DECODE_ETH) {
1406 eth = new Ethernet();
1407 eth.deserialize(pi.getPacketData(), 0,
1408 pi.getPacketData().length);
1409 counterStore.updatePacketInCounters(sw, m, eth);
1410 }
1411 // fall through to default case...
1412
1413 default:
1414
1415 List<IOFMessageListener> listeners = null;
1416 if (messageListeners.containsKey(m.getType())) {
1417 listeners = messageListeners.get(m.getType()).
1418 getOrderedListeners();
1419 }
1420
1421 FloodlightContext bc = null;
1422 if (listeners != null) {
1423 // Check if floodlight context is passed from the calling
1424 // function, if so use that floodlight context, otherwise
1425 // allocate one
1426 if (bContext == null) {
1427 bc = flcontext_alloc();
1428 } else {
1429 bc = bContext;
1430 }
1431 if (eth != null) {
1432 IFloodlightProviderService.bcStore.put(bc,
1433 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1434 eth);
1435 }
1436
1437 // Get the starting time (overall and per-component) of
1438 // the processing chain for this packet if performance
1439 // monitoring is turned on
1440 pktinProcTime.bootstrap(listeners);
1441 pktinProcTime.recordStartTimePktIn();
1442 Command cmd;
1443 for (IOFMessageListener listener : listeners) {
1444 if (listener instanceof IOFSwitchFilter) {
1445 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1446 continue;
1447 }
1448 }
1449
1450 pktinProcTime.recordStartTimeComp(listener);
1451 cmd = listener.receive(sw, m, bc);
1452 pktinProcTime.recordEndTimeComp(listener);
1453
1454 if (Command.STOP.equals(cmd)) {
1455 break;
1456 }
1457 }
1458 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1459 } else {
1460 log.warn("Unhandled OF Message: {} from {}", m, sw);
1461 }
1462
1463 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1464 }
1465 }
1466
1467 /**
1468 * Log an OpenFlow error message from a switch
1469 * @param sw The switch that sent the error
1470 * @param error The error message
1471 */
1472 @LogMessageDoc(level="ERROR",
1473 message="Error {error type} {error code} from {switch}",
1474 explanation="The switch responded with an unexpected error" +
1475 "to an OpenFlow message from the controller",
1476 recommendation="This could indicate improper network operation. " +
1477 "If the problem persists restarting the switch and " +
1478 "controller may help."
1479 )
1480 protected void logError(IOFSwitch sw, OFError error) {
1481 int etint = 0xffff & error.getErrorType();
1482 if (etint < 0 || etint >= OFErrorType.values().length) {
1483 log.error("Unknown error code {} from sw {}", etint, sw);
1484 }
1485 OFErrorType et = OFErrorType.values()[etint];
1486 switch (et) {
1487 case OFPET_HELLO_FAILED:
1488 OFHelloFailedCode hfc =
1489 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1490 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1491 break;
1492 case OFPET_BAD_REQUEST:
1493 OFBadRequestCode brc =
1494 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1495 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1496 break;
1497 case OFPET_BAD_ACTION:
1498 OFBadActionCode bac =
1499 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1500 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1501 break;
1502 case OFPET_FLOW_MOD_FAILED:
1503 OFFlowModFailedCode fmfc =
1504 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1505 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1506 break;
1507 case OFPET_PORT_MOD_FAILED:
1508 OFPortModFailedCode pmfc =
1509 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1510 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1511 break;
1512 case OFPET_QUEUE_OP_FAILED:
1513 OFQueueOpFailedCode qofc =
1514 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1515 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1516 break;
1517 default:
1518 break;
1519 }
1520 }
1521
1522 /**
1523 * Add a switch to the active switch list and call the switch listeners.
1524 * This happens either when a switch first connects (and the controller is
1525 * not in the slave role) or when the role of the controller changes from
1526 * slave to master.
1527 * @param sw the switch that has been added
1528 */
1529 // TODO: need to rethink locking and the synchronous switch update.
1530 // We can / should also handle duplicate DPIDs in connectedSwitches
1531 @LogMessageDoc(level="ERROR",
1532 message="New switch added {switch} for already-added switch {switch}",
1533 explanation="A switch with the same DPID as another switch " +
1534 "connected to the controller. This can be caused by " +
1535 "multiple switches configured with the same DPID, or " +
1536 "by a switch reconnected very quickly after " +
1537 "disconnecting.",
1538 recommendation="If this happens repeatedly, it is likely there " +
1539 "are switches with duplicate DPIDs on the network. " +
1540 "Reconfigure the appropriate switches. If it happens " +
1541 "very rarely, then it is likely this is a transient " +
1542 "network problem that can be ignored."
1543 )
1544 protected void addSwitch(IOFSwitch sw) {
1545 // TODO: is it safe to modify the HashMap without holding
1546 // the old switch's lock?
1547 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1548 if (sw == oldSw) {
1549 // Note == for object equality, not .equals for value
1550 log.info("New add switch for pre-existing switch {}", sw);
1551 return;
1552 }
1553
1554 if (oldSw != null) {
1555 oldSw.getListenerWriteLock().lock();
1556 try {
1557 log.error("New switch added {} for already-added switch {}",
1558 sw, oldSw);
1559 // Set the connected flag to false to suppress calling
1560 // the listeners for this switch in processOFMessage
1561 oldSw.setConnected(false);
1562
1563 oldSw.cancelAllStatisticsReplies();
1564
1565 updateInactiveSwitchInfo(oldSw);
1566
1567 // we need to clean out old switch state definitively
1568 // before adding the new switch
1569 // FIXME: It seems not completely kosher to call the
1570 // switch listeners here. I thought one of the points of
1571 // having the asynchronous switch update mechanism was so
1572 // the addedSwitch and removedSwitch were always called
1573 // from a single thread to simplify concurrency issues
1574 // for the listener.
1575 if (switchListeners != null) {
1576 for (IOFSwitchListener listener : switchListeners) {
1577 listener.removedSwitch(oldSw);
1578 }
1579 }
1580 // will eventually trigger a removeSwitch(), which will cause
1581 // a "Not removing Switch ... already removed debug message.
1582 // TODO: Figure out a way to handle this that avoids the
1583 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001584 log.debug("Closing {} because a new IOFSwitch got added " +
1585 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001586 oldSw.getChannel().close();
1587 }
1588 finally {
1589 oldSw.getListenerWriteLock().unlock();
1590 }
1591 }
1592
1593 updateActiveSwitchInfo(sw);
1594 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1595 try {
1596 this.updates.put(update);
1597 } catch (InterruptedException e) {
1598 log.error("Failure adding update to queue", e);
1599 }
1600 }
1601
1602 /**
1603 * Remove a switch from the active switch list and call the switch listeners.
1604 * This happens either when the switch is disconnected or when the
1605 * controller's role for the switch changes from master to slave.
1606 * @param sw the switch that has been removed
1607 */
1608 protected void removeSwitch(IOFSwitch sw) {
1609 // No need to acquire the listener lock, since
1610 // this method is only called after netty has processed all
1611 // pending messages
1612 log.debug("removeSwitch: {}", sw);
1613 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1614 log.debug("Not removing switch {}; already removed", sw);
1615 return;
1616 }
1617 // We cancel all outstanding statistics replies if the switch transition
1618 // from active. In the future we might allow statistics requests
1619 // from slave controllers. Then we need to move this cancelation
1620 // to switch disconnect
1621 sw.cancelAllStatisticsReplies();
1622
1623 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1624 // here if role support is enabled. In that case if the switch is being
1625 // removed because we've been switched to being in the slave role, then I think
1626 // it's possible that the new master may have already been promoted to master
1627 // and written out the active switch state to storage. If we now execute
1628 // updateInactiveSwitchInfo we may wipe out all of the state that was
1629 // written out by the new master. Maybe need to revisit how we handle all
1630 // of the switch state that's written to storage.
1631
1632 updateInactiveSwitchInfo(sw);
1633 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1634 try {
1635 this.updates.put(update);
1636 } catch (InterruptedException e) {
1637 log.error("Failure adding update to queue", e);
1638 }
1639 }
1640
1641 // ***************
1642 // IFloodlightProvider
1643 // ***************
1644
1645 @Override
1646 public synchronized void addOFMessageListener(OFType type,
1647 IOFMessageListener listener) {
1648 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1649 messageListeners.get(type);
1650 if (ldd == null) {
1651 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1652 messageListeners.put(type, ldd);
1653 }
1654 ldd.addListener(type, listener);
1655 }
1656
1657 @Override
1658 public synchronized void removeOFMessageListener(OFType type,
1659 IOFMessageListener listener) {
1660 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1661 messageListeners.get(type);
1662 if (ldd != null) {
1663 ldd.removeListener(listener);
1664 }
1665 }
1666
1667 private void logListeners() {
1668 for (Map.Entry<OFType,
1669 ListenerDispatcher<OFType,
1670 IOFMessageListener>> entry
1671 : messageListeners.entrySet()) {
1672
1673 OFType type = entry.getKey();
1674 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1675 entry.getValue();
1676
1677 StringBuffer sb = new StringBuffer();
1678 sb.append("OFListeners for ");
1679 sb.append(type);
1680 sb.append(": ");
1681 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1682 sb.append(l.getName());
1683 sb.append(",");
1684 }
1685 log.debug(sb.toString());
1686 }
1687 }
1688
1689 public void removeOFMessageListeners(OFType type) {
1690 messageListeners.remove(type);
1691 }
1692
1693 @Override
1694 public Map<Long, IOFSwitch> getSwitches() {
1695 return Collections.unmodifiableMap(this.activeSwitches);
1696 }
1697
1698 @Override
1699 public void addOFSwitchListener(IOFSwitchListener listener) {
1700 this.switchListeners.add(listener);
1701 }
1702
1703 @Override
1704 public void removeOFSwitchListener(IOFSwitchListener listener) {
1705 this.switchListeners.remove(listener);
1706 }
1707
1708 @Override
1709 public Map<OFType, List<IOFMessageListener>> getListeners() {
1710 Map<OFType, List<IOFMessageListener>> lers =
1711 new HashMap<OFType, List<IOFMessageListener>>();
1712 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1713 messageListeners.entrySet()) {
1714 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1715 }
1716 return Collections.unmodifiableMap(lers);
1717 }
1718
1719 @Override
1720 @LogMessageDocs({
1721 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1722 "a null switch",
1723 explanation="Failed to process a message because the switch " +
1724 " is no longer connected."),
1725 @LogMessageDoc(level="ERROR",
1726 message="Error reinjecting OFMessage on switch {switch}",
1727 explanation="An I/O error occured while attempting to " +
1728 "process an OpenFlow message",
1729 recommendation=LogMessageDoc.CHECK_SWITCH)
1730 })
1731 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1732 FloodlightContext bc) {
1733 if (sw == null) {
1734 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1735 return false;
1736 }
1737
1738 // FIXME: Do we need to be able to inject messages to switches
1739 // where we're the slave controller (i.e. they're connected but
1740 // not active)?
1741 // FIXME: Don't we need synchronization logic here so we're holding
1742 // the listener read lock when we call handleMessage? After some
1743 // discussions it sounds like the right thing to do here would be to
1744 // inject the message as a netty upstream channel event so it goes
1745 // through the normal netty event processing, including being
1746 // handled
1747 if (!activeSwitches.containsKey(sw.getId())) return false;
1748
1749 try {
1750 // Pass Floodlight context to the handleMessages()
1751 handleMessage(sw, msg, bc);
1752 } catch (IOException e) {
1753 log.error("Error reinjecting OFMessage on switch {}",
1754 HexString.toHexString(sw.getId()));
1755 return false;
1756 }
1757 return true;
1758 }
1759
1760 @Override
1761 @LogMessageDoc(message="Calling System.exit",
1762 explanation="The controller is terminating")
1763 public synchronized void terminate() {
1764 log.info("Calling System.exit");
1765 System.exit(1);
1766 }
1767
1768 @Override
1769 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1770 // call the overloaded version with floodlight context set to null
1771 return injectOfMessage(sw, msg, null);
1772 }
1773
1774 @Override
1775 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1776 FloodlightContext bc) {
1777 if (log.isTraceEnabled()) {
1778 String str = OFMessage.getDataAsString(sw, m, bc);
1779 log.trace("{}", str);
1780 }
1781
1782 List<IOFMessageListener> listeners = null;
1783 if (messageListeners.containsKey(m.getType())) {
1784 listeners =
1785 messageListeners.get(m.getType()).getOrderedListeners();
1786 }
1787
1788 if (listeners != null) {
1789 for (IOFMessageListener listener : listeners) {
1790 if (listener instanceof IOFSwitchFilter) {
1791 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1792 continue;
1793 }
1794 }
1795 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1796 break;
1797 }
1798 }
1799 }
1800 }
1801
1802 @Override
1803 public BasicFactory getOFMessageFactory() {
1804 return factory;
1805 }
1806
1807 @Override
1808 public String getControllerId() {
1809 return controllerId;
1810 }
1811
1812 // **************
1813 // Initialization
1814 // **************
1815
1816 protected void updateAllInactiveSwitchInfo() {
1817 if (role == Role.SLAVE) {
1818 return;
1819 }
1820 String controllerId = getControllerId();
1821 String[] switchColumns = { SWITCH_DATAPATH_ID,
1822 SWITCH_CONTROLLER_ID,
1823 SWITCH_ACTIVE };
1824 String[] portColumns = { PORT_ID, PORT_SWITCH };
1825 IResultSet switchResultSet = null;
1826 try {
1827 OperatorPredicate op =
1828 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1829 OperatorPredicate.Operator.EQ,
1830 controllerId);
1831 switchResultSet =
1832 storageSource.executeQuery(SWITCH_TABLE_NAME,
1833 switchColumns,
1834 op, null);
1835 while (switchResultSet.next()) {
1836 IResultSet portResultSet = null;
1837 try {
1838 String datapathId =
1839 switchResultSet.getString(SWITCH_DATAPATH_ID);
1840 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1841 op = new OperatorPredicate(PORT_SWITCH,
1842 OperatorPredicate.Operator.EQ,
1843 datapathId);
1844 portResultSet =
1845 storageSource.executeQuery(PORT_TABLE_NAME,
1846 portColumns,
1847 op, null);
1848 while (portResultSet.next()) {
1849 portResultSet.deleteRow();
1850 }
1851 portResultSet.save();
1852 }
1853 finally {
1854 if (portResultSet != null)
1855 portResultSet.close();
1856 }
1857 }
1858 switchResultSet.save();
1859 }
1860 finally {
1861 if (switchResultSet != null)
1862 switchResultSet.close();
1863 }
1864 }
1865
1866 protected void updateControllerInfo() {
1867 updateAllInactiveSwitchInfo();
1868
1869 // Write out the controller info to the storage source
1870 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1871 String id = getControllerId();
1872 controllerInfo.put(CONTROLLER_ID, id);
1873 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1874 }
1875
1876 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1877 if (role == Role.SLAVE) {
1878 return;
1879 }
1880 // Obtain the row info for the switch
1881 Map<String, Object> switchInfo = new HashMap<String, Object>();
1882 String datapathIdString = sw.getStringId();
1883 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1884 String controllerId = getControllerId();
1885 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1886 Date connectedSince = sw.getConnectedSince();
1887 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1888 Channel channel = sw.getChannel();
1889 SocketAddress socketAddress = channel.getRemoteAddress();
1890 if (socketAddress != null) {
1891 String socketAddressString = socketAddress.toString();
1892 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1893 if (socketAddress instanceof InetSocketAddress) {
1894 InetSocketAddress inetSocketAddress =
1895 (InetSocketAddress)socketAddress;
1896 InetAddress inetAddress = inetSocketAddress.getAddress();
1897 String ip = inetAddress.getHostAddress();
1898 switchInfo.put(SWITCH_IP, ip);
1899 }
1900 }
1901
1902 // Write out the switch features info
1903 long capabilities = U32.f(sw.getCapabilities());
1904 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1905 long buffers = U32.f(sw.getBuffers());
1906 switchInfo.put(SWITCH_BUFFERS, buffers);
1907 long tables = U32.f(sw.getTables());
1908 switchInfo.put(SWITCH_TABLES, tables);
1909 long actions = U32.f(sw.getActions());
1910 switchInfo.put(SWITCH_ACTIONS, actions);
1911 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1912
1913 // Update the switch
1914 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1915
1916 // Update the ports
1917 for (OFPhysicalPort port: sw.getPorts()) {
1918 updatePortInfo(sw, port);
1919 }
1920 }
1921
1922 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1923 if (role == Role.SLAVE) {
1924 return;
1925 }
1926 log.debug("Update DB with inactiveSW {}", sw);
1927 // Update the controller info in the storage source to be inactive
1928 Map<String, Object> switchInfo = new HashMap<String, Object>();
1929 String datapathIdString = sw.getStringId();
1930 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1931 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1932 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1933 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1934 }
1935
1936 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1937 if (role == Role.SLAVE) {
1938 return;
1939 }
1940 String datapathIdString = sw.getStringId();
1941 Map<String, Object> portInfo = new HashMap<String, Object>();
1942 int portNumber = U16.f(port.getPortNumber());
1943 String id = datapathIdString + "|" + portNumber;
1944 portInfo.put(PORT_ID, id);
1945 portInfo.put(PORT_SWITCH, datapathIdString);
1946 portInfo.put(PORT_NUMBER, portNumber);
1947 byte[] hardwareAddress = port.getHardwareAddress();
1948 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1949 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1950 String name = port.getName();
1951 portInfo.put(PORT_NAME, name);
1952 long config = U32.f(port.getConfig());
1953 portInfo.put(PORT_CONFIG, config);
1954 long state = U32.f(port.getState());
1955 portInfo.put(PORT_STATE, state);
1956 long currentFeatures = U32.f(port.getCurrentFeatures());
1957 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1958 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1959 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1960 long supportedFeatures = U32.f(port.getSupportedFeatures());
1961 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1962 long peerFeatures = U32.f(port.getPeerFeatures());
1963 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1964 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1965 }
1966
1967 /**
1968 * Read switch port data from storage and write it into a switch object
1969 * @param sw the switch to update
1970 */
1971 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1972 OperatorPredicate op =
1973 new OperatorPredicate(PORT_SWITCH,
1974 OperatorPredicate.Operator.EQ,
1975 sw.getStringId());
1976 IResultSet portResultSet =
1977 storageSource.executeQuery(PORT_TABLE_NAME,
1978 null, op, null);
1979 //Map<Short, OFPhysicalPort> oldports =
1980 // new HashMap<Short, OFPhysicalPort>();
1981 //oldports.putAll(sw.getPorts());
1982
1983 while (portResultSet.next()) {
1984 try {
1985 OFPhysicalPort p = new OFPhysicalPort();
1986 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1987 p.setName(portResultSet.getString(PORT_NAME));
1988 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1989 p.setState((int)portResultSet.getLong(PORT_STATE));
1990 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1991 p.setHardwareAddress(HexString.fromHexString(portMac));
1992 p.setCurrentFeatures((int)portResultSet.
1993 getLong(PORT_CURRENT_FEATURES));
1994 p.setAdvertisedFeatures((int)portResultSet.
1995 getLong(PORT_ADVERTISED_FEATURES));
1996 p.setSupportedFeatures((int)portResultSet.
1997 getLong(PORT_SUPPORTED_FEATURES));
1998 p.setPeerFeatures((int)portResultSet.
1999 getLong(PORT_PEER_FEATURES));
2000 //oldports.remove(Short.valueOf(p.getPortNumber()));
2001 sw.setPort(p);
2002 } catch (NullPointerException e) {
2003 // ignore
2004 }
2005 }
2006 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2007 try {
2008 this.updates.put(update);
2009 } catch (InterruptedException e) {
2010 log.error("Failure adding update to queue", e);
2011 }
2012 }
2013
2014 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2015 if (role == Role.SLAVE) {
2016 return;
2017 }
2018 String datapathIdString = sw.getStringId();
2019 String id = datapathIdString + "|" + portNumber;
2020 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2021 }
2022
2023 /**
2024 * Sets the initial role based on properties in the config params.
2025 * It looks for two different properties.
2026 * If the "role" property is specified then the value should be
2027 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2028 * controller is set to the specified value. If the "role" property
2029 * is not specified then it looks next for the "role.path" property.
2030 * In this case the value should be the path to a property file in
2031 * the file system that contains a property called "floodlight.role"
2032 * which can be one of the values listed above for the "role" property.
2033 * The idea behind the "role.path" mechanism is that you have some
2034 * separate heartbeat and master controller election algorithm that
2035 * determines the role of the controller. When a role transition happens,
2036 * it updates the current role in the file specified by the "role.path"
2037 * file. Then if floodlight restarts for some reason it can get the
2038 * correct current role of the controller from the file.
2039 * @param configParams The config params for the FloodlightProvider service
2040 * @return A valid role if role information is specified in the
2041 * config params, otherwise null
2042 */
2043 @LogMessageDocs({
2044 @LogMessageDoc(message="Controller role set to {role}",
2045 explanation="Setting the initial HA role to "),
2046 @LogMessageDoc(level="ERROR",
2047 message="Invalid current role value: {role}",
2048 explanation="An invalid HA role value was read from the " +
2049 "properties file",
2050 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2051 })
2052 protected Role getInitialRole(Map<String, String> configParams) {
2053 Role role = null;
2054 String roleString = configParams.get("role");
2055 if (roleString == null) {
2056 String rolePath = configParams.get("rolepath");
2057 if (rolePath != null) {
2058 Properties properties = new Properties();
2059 try {
2060 properties.load(new FileInputStream(rolePath));
2061 roleString = properties.getProperty("floodlight.role");
2062 }
2063 catch (IOException exc) {
2064 // Don't treat it as an error if the file specified by the
2065 // rolepath property doesn't exist. This lets us enable the
2066 // HA mechanism by just creating/setting the floodlight.role
2067 // property in that file without having to modify the
2068 // floodlight properties.
2069 }
2070 }
2071 }
2072
2073 if (roleString != null) {
2074 // Canonicalize the string to the form used for the enum constants
2075 roleString = roleString.trim().toUpperCase();
2076 try {
2077 role = Role.valueOf(roleString);
2078 }
2079 catch (IllegalArgumentException exc) {
2080 log.error("Invalid current role value: {}", roleString);
2081 }
2082 }
2083
2084 log.info("Controller role set to {}", role);
2085
2086 return role;
2087 }
2088
2089 /**
2090 * Tell controller that we're ready to accept switches loop
2091 * @throws IOException
2092 */
2093 @LogMessageDocs({
2094 @LogMessageDoc(message="Listening for switch connections on {address}",
2095 explanation="The controller is ready and listening for new" +
2096 " switch connections"),
2097 @LogMessageDoc(message="Storage exception in controller " +
2098 "updates loop; terminating process",
2099 explanation=ERROR_DATABASE,
2100 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2101 @LogMessageDoc(level="ERROR",
2102 message="Exception in controller updates loop",
2103 explanation="Failed to dispatch controller event",
2104 recommendation=LogMessageDoc.GENERIC_ACTION)
2105 })
2106 public void run() {
2107 if (log.isDebugEnabled()) {
2108 logListeners();
2109 }
2110
2111 try {
2112 final ServerBootstrap bootstrap = createServerBootStrap();
2113
2114 bootstrap.setOption("reuseAddr", true);
2115 bootstrap.setOption("child.keepAlive", true);
2116 bootstrap.setOption("child.tcpNoDelay", true);
2117 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2118
2119 ChannelPipelineFactory pfact =
2120 new OpenflowPipelineFactory(this, null);
2121 bootstrap.setPipelineFactory(pfact);
2122 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2123 final ChannelGroup cg = new DefaultChannelGroup();
2124 cg.add(bootstrap.bind(sa));
2125
2126 log.info("Listening for switch connections on {}", sa);
2127 } catch (Exception e) {
2128 throw new RuntimeException(e);
2129 }
2130
2131 // main loop
2132 while (true) {
2133 try {
2134 IUpdate update = updates.take();
2135 update.dispatch();
2136 } catch (InterruptedException e) {
2137 return;
2138 } catch (StorageException e) {
2139 log.error("Storage exception in controller " +
2140 "updates loop; terminating process", e);
2141 return;
2142 } catch (Exception e) {
2143 log.error("Exception in controller updates loop", e);
2144 }
2145 }
2146 }
2147
2148 private ServerBootstrap createServerBootStrap() {
2149 if (workerThreads == 0) {
2150 return new ServerBootstrap(
2151 new NioServerSocketChannelFactory(
2152 Executors.newCachedThreadPool(),
2153 Executors.newCachedThreadPool()));
2154 } else {
2155 return new ServerBootstrap(
2156 new NioServerSocketChannelFactory(
2157 Executors.newCachedThreadPool(),
2158 Executors.newCachedThreadPool(), workerThreads));
2159 }
2160 }
2161
2162 public void setConfigParams(Map<String, String> configParams) {
2163 String ofPort = configParams.get("openflowport");
2164 if (ofPort != null) {
2165 this.openFlowPort = Integer.parseInt(ofPort);
2166 }
2167 log.debug("OpenFlow port set to {}", this.openFlowPort);
2168 String threads = configParams.get("workerthreads");
2169 if (threads != null) {
2170 this.workerThreads = Integer.parseInt(threads);
2171 }
2172 log.debug("Number of worker threads set to {}", this.workerThreads);
2173 String controllerId = configParams.get("controllerid");
2174 if (controllerId != null) {
2175 this.controllerId = controllerId;
2176 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002177 else {
2178 //Try to get the hostname of the machine and use that for controller ID
2179 try {
2180 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2181 this.controllerId = hostname;
2182 } catch (UnknownHostException e) {
2183 // Can't get hostname, we'll just use the default
2184 }
2185 }
2186
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002187 log.debug("ControllerId set to {}", this.controllerId);
2188 }
2189
2190 private void initVendorMessages() {
2191 // Configure openflowj to be able to parse the role request/reply
2192 // vendor messages.
2193 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2194 OFNiciraVendorData.NX_VENDOR_ID, 4);
2195 OFVendorId.registerVendorId(niciraVendorId);
2196 OFBasicVendorDataType roleRequestVendorData =
2197 new OFBasicVendorDataType(
2198 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2199 OFRoleRequestVendorData.getInstantiable());
2200 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2201 OFBasicVendorDataType roleReplyVendorData =
2202 new OFBasicVendorDataType(
2203 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2204 OFRoleReplyVendorData.getInstantiable());
2205 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2206 }
2207
2208 /**
2209 * Initialize internal data structures
2210 */
2211 public void init(Map<String, String> configParams) {
2212 // These data structures are initialized here because other
2213 // module's startUp() might be called before ours
2214 this.messageListeners =
2215 new ConcurrentHashMap<OFType,
2216 ListenerDispatcher<OFType,
2217 IOFMessageListener>>();
2218 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2219 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2220 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2221 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2222 this.controllerNodeIPsCache = new HashMap<String, String>();
2223 this.updates = new LinkedBlockingQueue<IUpdate>();
2224 this.factory = new BasicFactory();
2225 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2226 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002227 //this.role = getInitialRole(configParams);
2228 //Set the controller's role to MASTER so it always tries to do role requests.
2229 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002230 this.roleChanger = new RoleChanger();
2231 initVendorMessages();
2232 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002233 }
2234
2235 /**
2236 * Startup all of the controller's components
2237 */
2238 @LogMessageDoc(message="Waiting for storage source",
2239 explanation="The system database is not yet ready",
2240 recommendation="If this message persists, this indicates " +
2241 "that the system database has failed to start. " +
2242 LogMessageDoc.CHECK_CONTROLLER)
2243 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002244 try {
2245 registryService.registerController(controllerId);
2246 } catch (RegistryException e2) {
2247 log.warn("Registry service error: {}", e2.getMessage());
2248 }
2249
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002250 // Create the table names we use
2251 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2252 storageSource.createTable(SWITCH_TABLE_NAME, null);
2253 storageSource.createTable(PORT_TABLE_NAME, null);
2254 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2255 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2256 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2257 CONTROLLER_ID);
2258 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2259 SWITCH_DATAPATH_ID);
2260 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2261 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2262 CONTROLLER_INTERFACE_ID);
2263 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2264
2265 while (true) {
2266 try {
2267 updateControllerInfo();
2268 break;
2269 }
2270 catch (StorageException e) {
2271 log.info("Waiting for storage source");
2272 try {
2273 Thread.sleep(1000);
2274 } catch (InterruptedException e1) {
2275 }
2276 }
2277 }
2278
2279 // Add our REST API
2280 restApi.addRestletRoutable(new CoreWebRoutable());
2281 }
2282
2283 @Override
2284 public void addInfoProvider(String type, IInfoProvider provider) {
2285 if (!providerMap.containsKey(type)) {
2286 providerMap.put(type, new ArrayList<IInfoProvider>());
2287 }
2288 providerMap.get(type).add(provider);
2289 }
2290
2291 @Override
2292 public void removeInfoProvider(String type, IInfoProvider provider) {
2293 if (!providerMap.containsKey(type)) {
2294 log.debug("Provider type {} doesn't exist.", type);
2295 return;
2296 }
2297
2298 providerMap.get(type).remove(provider);
2299 }
2300
2301 public Map<String, Object> getControllerInfo(String type) {
2302 if (!providerMap.containsKey(type)) return null;
2303
2304 Map<String, Object> result = new LinkedHashMap<String, Object>();
2305 for (IInfoProvider provider : providerMap.get(type)) {
2306 result.putAll(provider.getInfo(type));
2307 }
2308
2309 return result;
2310 }
2311
2312 @Override
2313 public void addHAListener(IHAListener listener) {
2314 this.haListeners.add(listener);
2315 }
2316
2317 @Override
2318 public void removeHAListener(IHAListener listener) {
2319 this.haListeners.remove(listener);
2320 }
2321
2322
2323 /**
2324 * Handle changes to the controller nodes IPs and dispatch update.
2325 */
2326 @SuppressWarnings("unchecked")
2327 protected void handleControllerNodeIPChanges() {
2328 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2329 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2330 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2331 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2332 CONTROLLER_INTERFACE_TYPE,
2333 CONTROLLER_INTERFACE_NUMBER,
2334 CONTROLLER_INTERFACE_DISCOVERED_IP };
2335 synchronized(controllerNodeIPsCache) {
2336 // We currently assume that interface Ethernet0 is the relevant
2337 // controller interface. Might change.
2338 // We could (should?) implement this using
2339 // predicates, but creating the individual and compound predicate
2340 // seems more overhead then just checking every row. Particularly,
2341 // since the number of rows is small and changes infrequent
2342 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2343 colNames,null, null);
2344 while (res.next()) {
2345 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2346 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2347 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2348 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2349 String curIP = controllerNodeIPsCache.get(controllerID);
2350
2351 curControllerNodeIPs.put(controllerID, discoveredIP);
2352 if (curIP == null) {
2353 // new controller node IP
2354 addedControllerNodeIPs.put(controllerID, discoveredIP);
2355 }
2356 else if (curIP != discoveredIP) {
2357 // IP changed
2358 removedControllerNodeIPs.put(controllerID, curIP);
2359 addedControllerNodeIPs.put(controllerID, discoveredIP);
2360 }
2361 }
2362 }
2363 // Now figure out if rows have been deleted. We can't use the
2364 // rowKeys from rowsDeleted directly, since the tables primary
2365 // key is a compound that we can't disassemble
2366 Set<String> curEntries = curControllerNodeIPs.keySet();
2367 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2368 removedEntries.removeAll(curEntries);
2369 for (String removedControllerID : removedEntries)
2370 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2371 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2372 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2373 curControllerNodeIPs, addedControllerNodeIPs,
2374 removedControllerNodeIPs);
2375 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2376 try {
2377 this.updates.put(update);
2378 } catch (InterruptedException e) {
2379 log.error("Failure adding update to queue", e);
2380 }
2381 }
2382 }
2383 }
2384
2385 @Override
2386 public Map<String, String> getControllerNodeIPs() {
2387 // We return a copy of the mapping so we can guarantee that
2388 // the mapping return is the same as one that will be (or was)
2389 // dispatched to IHAListeners
2390 HashMap<String,String> retval = new HashMap<String,String>();
2391 synchronized(controllerNodeIPsCache) {
2392 retval.putAll(controllerNodeIPsCache);
2393 }
2394 return retval;
2395 }
2396
2397 @Override
2398 public void rowsModified(String tableName, Set<Object> rowKeys) {
2399 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2400 handleControllerNodeIPChanges();
2401 }
2402
2403 }
2404
2405 @Override
2406 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2407 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2408 handleControllerNodeIPChanges();
2409 }
2410 }
2411
2412 @Override
2413 public long getSystemStartTime() {
2414 return (this.systemStartTime);
2415 }
2416
2417 @Override
2418 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2419 this.alwaysClearFlowsOnSwAdd = value;
2420 }
2421
2422 public boolean getAlwaysClearFlowsOnSwAdd() {
2423 return this.alwaysClearFlowsOnSwAdd;
2424 }
2425}