blob: 1a9a9c5678fbd717cfafc545392974877adcd19e [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
58import net.floodlightcontroller.core.INetMapTopologyService.ITopoRouteService;
59import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080060import net.floodlightcontroller.core.IOFSwitch;
61import net.floodlightcontroller.core.IOFSwitchFilter;
62import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berde8557a462013-01-07 08:59:31 -080063import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080064import net.floodlightcontroller.core.annotations.LogMessageDoc;
65import net.floodlightcontroller.core.annotations.LogMessageDocs;
66import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
67import net.floodlightcontroller.core.util.ListenerDispatcher;
68import net.floodlightcontroller.core.web.CoreWebRoutable;
69import net.floodlightcontroller.counter.ICounterStoreService;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -080070import net.floodlightcontroller.flowcache.IFlowService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080071import net.floodlightcontroller.packet.Ethernet;
72import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
73import net.floodlightcontroller.restserver.IRestApiService;
74import net.floodlightcontroller.storage.IResultSet;
75import net.floodlightcontroller.storage.IStorageSourceListener;
76import net.floodlightcontroller.storage.IStorageSourceService;
77import net.floodlightcontroller.storage.OperatorPredicate;
78import net.floodlightcontroller.storage.StorageException;
79import net.floodlightcontroller.threadpool.IThreadPoolService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080080import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080081import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080082import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080083
84import org.jboss.netty.bootstrap.ServerBootstrap;
85import org.jboss.netty.buffer.ChannelBuffer;
86import org.jboss.netty.buffer.ChannelBuffers;
87import org.jboss.netty.channel.Channel;
88import org.jboss.netty.channel.ChannelHandlerContext;
89import org.jboss.netty.channel.ChannelPipelineFactory;
90import org.jboss.netty.channel.ChannelStateEvent;
91import org.jboss.netty.channel.ChannelUpstreamHandler;
92import org.jboss.netty.channel.Channels;
93import org.jboss.netty.channel.ExceptionEvent;
94import org.jboss.netty.channel.MessageEvent;
95import org.jboss.netty.channel.group.ChannelGroup;
96import org.jboss.netty.channel.group.DefaultChannelGroup;
97import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
98import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
99import org.jboss.netty.handler.timeout.IdleStateEvent;
100import org.jboss.netty.handler.timeout.ReadTimeoutException;
101import org.openflow.protocol.OFEchoReply;
102import org.openflow.protocol.OFError;
103import org.openflow.protocol.OFError.OFBadActionCode;
104import org.openflow.protocol.OFError.OFBadRequestCode;
105import org.openflow.protocol.OFError.OFErrorType;
106import org.openflow.protocol.OFError.OFFlowModFailedCode;
107import org.openflow.protocol.OFError.OFHelloFailedCode;
108import org.openflow.protocol.OFError.OFPortModFailedCode;
109import org.openflow.protocol.OFError.OFQueueOpFailedCode;
110import org.openflow.protocol.OFFeaturesReply;
111import org.openflow.protocol.OFGetConfigReply;
112import org.openflow.protocol.OFMessage;
113import org.openflow.protocol.OFPacketIn;
114import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800115import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800116import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800117import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800118import org.openflow.protocol.OFPortStatus.OFPortReason;
119import org.openflow.protocol.OFSetConfig;
120import org.openflow.protocol.OFStatisticsRequest;
121import org.openflow.protocol.OFSwitchConfig;
122import org.openflow.protocol.OFType;
123import org.openflow.protocol.OFVendor;
124import org.openflow.protocol.factory.BasicFactory;
125import org.openflow.protocol.factory.MessageParseException;
126import org.openflow.protocol.statistics.OFDescriptionStatistics;
127import org.openflow.protocol.statistics.OFStatistics;
128import org.openflow.protocol.statistics.OFStatisticsType;
129import org.openflow.protocol.vendor.OFBasicVendorDataType;
130import org.openflow.protocol.vendor.OFBasicVendorId;
131import org.openflow.protocol.vendor.OFVendorId;
132import org.openflow.util.HexString;
133import org.openflow.util.U16;
134import org.openflow.util.U32;
135import org.openflow.vendor.nicira.OFNiciraVendorData;
136import org.openflow.vendor.nicira.OFRoleReplyVendorData;
137import org.openflow.vendor.nicira.OFRoleRequestVendorData;
138import org.openflow.vendor.nicira.OFRoleVendorData;
139import org.slf4j.Logger;
140import org.slf4j.LoggerFactory;
141
142
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800143
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800144/**
145 * The main controller class. Handles all setup and network listeners
146 */
147public class Controller implements IFloodlightProviderService,
148 IStorageSourceListener {
Pankaj Berde15193092013-03-21 17:30:14 -0700149
150 protected SwitchStorageImpl swStore;;
Pankaj Berde8557a462013-01-07 08:59:31 -0800151
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800152 protected static Logger log = LoggerFactory.getLogger(Controller.class);
153
154 private static final String ERROR_DATABASE =
155 "The controller could not communicate with the system database.";
156
157 protected BasicFactory factory;
158 protected ConcurrentMap<OFType,
159 ListenerDispatcher<OFType,IOFMessageListener>>
160 messageListeners;
161 // The activeSwitches map contains only those switches that are actively
162 // being controlled by us -- it doesn't contain switches that are
163 // in the slave role
164 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
165 // connectedSwitches contains all connected switches, including ones where
166 // we're a slave controller. We need to keep track of them so that we can
167 // send role request messages to switches when our role changes to master
168 // We add a switch to this set after it successfully completes the
169 // handshake. Access to this Set needs to be synchronized with roleChanger
170 protected HashSet<OFSwitchImpl> connectedSwitches;
171
172 // The controllerNodeIPsCache maps Controller IDs to their IP address.
173 // It's only used by handleControllerNodeIPsChanged
174 protected HashMap<String, String> controllerNodeIPsCache;
175
176 protected Set<IOFSwitchListener> switchListeners;
177 protected Set<IHAListener> haListeners;
178 protected Map<String, List<IInfoProvider>> providerMap;
179 protected BlockingQueue<IUpdate> updates;
180
181 // Module dependencies
182 protected IRestApiService restApi;
183 protected ICounterStoreService counterStore = null;
184 protected IStorageSourceService storageSource;
185 protected IPktInProcessingTimeService pktinProcTime;
186 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800187 protected IFlowService flowService;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800188 protected ITopoRouteService topoRouteService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800189 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800190
191 // Configuration options
192 protected int openFlowPort = 6633;
193 protected int workerThreads = 0;
194 // The id for this controller node. Should be unique for each controller
195 // node in a controller cluster.
196 protected String controllerId = "localhost";
197
198 // The current role of the controller.
199 // If the controller isn't configured to support roles, then this is null.
200 protected Role role;
201 // A helper that handles sending and timeout handling for role requests
202 protected RoleChanger roleChanger;
203
204 // Start time of the controller
205 protected long systemStartTime;
206
207 // Flag to always flush flow table on switch reconnect (HA or otherwise)
208 protected boolean alwaysClearFlowsOnSwAdd = false;
209
210 // Storage table names
211 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
212 protected static final String CONTROLLER_ID = "id";
213
214 protected static final String SWITCH_TABLE_NAME = "controller_switch";
215 protected static final String SWITCH_DATAPATH_ID = "dpid";
216 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
217 protected static final String SWITCH_IP = "ip";
218 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
219 protected static final String SWITCH_ACTIVE = "active";
220 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
221 protected static final String SWITCH_CAPABILITIES = "capabilities";
222 protected static final String SWITCH_BUFFERS = "buffers";
223 protected static final String SWITCH_TABLES = "tables";
224 protected static final String SWITCH_ACTIONS = "actions";
225
226 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
227 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
228
229 protected static final String PORT_TABLE_NAME = "controller_port";
230 protected static final String PORT_ID = "id";
231 protected static final String PORT_SWITCH = "switch_id";
232 protected static final String PORT_NUMBER = "number";
233 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
234 protected static final String PORT_NAME = "name";
235 protected static final String PORT_CONFIG = "config";
236 protected static final String PORT_STATE = "state";
237 protected static final String PORT_CURRENT_FEATURES = "current_features";
238 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
239 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
240 protected static final String PORT_PEER_FEATURES = "peer_features";
241
242 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
243 protected static final String CONTROLLER_INTERFACE_ID = "id";
244 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
245 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
246 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
247 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
248
249
250
251 // Perf. related configuration
252 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
253 protected static final int BATCH_MAX_SIZE = 100;
254 protected static final boolean ALWAYS_DECODE_ETH = true;
255
256 /**
257 * Updates handled by the main loop
258 */
259 protected interface IUpdate {
260 /**
261 * Calls the appropriate listeners
262 */
263 public void dispatch();
264 }
265 public enum SwitchUpdateType {
266 ADDED,
267 REMOVED,
268 PORTCHANGED
269 }
270 /**
271 * Update message indicating a switch was added or removed
272 */
273 protected class SwitchUpdate implements IUpdate {
274 public IOFSwitch sw;
275 public SwitchUpdateType switchUpdateType;
276 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
277 this.sw = sw;
278 this.switchUpdateType = switchUpdateType;
279 }
280 public void dispatch() {
281 if (log.isTraceEnabled()) {
282 log.trace("Dispatching switch update {} {}",
283 sw, switchUpdateType);
284 }
285 if (switchListeners != null) {
286 for (IOFSwitchListener listener : switchListeners) {
287 switch(switchUpdateType) {
288 case ADDED:
289 listener.addedSwitch(sw);
290 break;
291 case REMOVED:
292 listener.removedSwitch(sw);
293 break;
294 case PORTCHANGED:
295 listener.switchPortChanged(sw.getId());
296 break;
297 }
298 }
299 }
300 }
301 }
302
303 /**
304 * Update message indicating controller's role has changed
305 */
306 protected class HARoleUpdate implements IUpdate {
307 public Role oldRole;
308 public Role newRole;
309 public HARoleUpdate(Role newRole, Role oldRole) {
310 this.oldRole = oldRole;
311 this.newRole = newRole;
312 }
313 public void dispatch() {
314 // Make sure that old and new roles are different.
315 if (oldRole == newRole) {
316 if (log.isTraceEnabled()) {
317 log.trace("HA role update ignored as the old and " +
318 "new roles are the same. newRole = {}" +
319 "oldRole = {}", newRole, oldRole);
320 }
321 return;
322 }
323 if (log.isTraceEnabled()) {
324 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
325 newRole, oldRole);
326 }
327 if (haListeners != null) {
328 for (IHAListener listener : haListeners) {
329 listener.roleChanged(oldRole, newRole);
330 }
331 }
332 }
333 }
334
335 /**
336 * Update message indicating
337 * IPs of controllers in controller cluster have changed.
338 */
339 protected class HAControllerNodeIPUpdate implements IUpdate {
340 public Map<String,String> curControllerNodeIPs;
341 public Map<String,String> addedControllerNodeIPs;
342 public Map<String,String> removedControllerNodeIPs;
343 public HAControllerNodeIPUpdate(
344 HashMap<String,String> curControllerNodeIPs,
345 HashMap<String,String> addedControllerNodeIPs,
346 HashMap<String,String> removedControllerNodeIPs) {
347 this.curControllerNodeIPs = curControllerNodeIPs;
348 this.addedControllerNodeIPs = addedControllerNodeIPs;
349 this.removedControllerNodeIPs = removedControllerNodeIPs;
350 }
351 public void dispatch() {
352 if (log.isTraceEnabled()) {
353 log.trace("Dispatching HA Controller Node IP update "
354 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
355 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
356 removedControllerNodeIPs }
357 );
358 }
359 if (haListeners != null) {
360 for (IHAListener listener: haListeners) {
361 listener.controllerNodeIPsChanged(curControllerNodeIPs,
362 addedControllerNodeIPs, removedControllerNodeIPs);
363 }
364 }
365 }
366 }
367
368 // ***************
369 // Getters/Setters
370 // ***************
371
372 public void setStorageSourceService(IStorageSourceService storageSource) {
373 this.storageSource = storageSource;
374 }
375
376 public void setCounterStore(ICounterStoreService counterStore) {
377 this.counterStore = counterStore;
378 }
379
380 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
381 this.pktinProcTime = pits;
382 }
383
384 public void setRestApiService(IRestApiService restApi) {
385 this.restApi = restApi;
386 }
387
388 public void setThreadPoolService(IThreadPoolService tp) {
389 this.threadPool = tp;
390 }
391
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800392 public void setFlowService(IFlowService serviceImpl) {
393 this.flowService = serviceImpl;
394 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800395
396 public void setTopoRouteService(ITopoRouteService serviceImpl) {
397 this.topoRouteService = serviceImpl;
398 }
Jonathan Hartc2e95ee2013-02-22 15:25:11 -0800399
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800400 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800401 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800402 }
403
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800404 @Override
405 public Role getRole() {
406 synchronized(roleChanger) {
407 return role;
408 }
409 }
410
411 @Override
412 public void setRole(Role role) {
413 if (role == null) throw new NullPointerException("Role can not be null.");
414 if (role == Role.MASTER && this.role == Role.SLAVE) {
415 // Reset db state to Inactive for all switches.
416 updateAllInactiveSwitchInfo();
417 }
418
419 // Need to synchronize to ensure a reliable ordering on role request
420 // messages send and to ensure the list of connected switches is stable
421 // RoleChanger will handle the actual sending of the message and
422 // timeout handling
423 // @see RoleChanger
424 synchronized(roleChanger) {
425 if (role.equals(this.role)) {
426 log.debug("Ignoring role change: role is already {}", role);
427 return;
428 }
429
430 Role oldRole = this.role;
431 this.role = role;
432
433 log.debug("Submitting role change request to role {}", role);
434 roleChanger.submitRequest(connectedSwitches, role);
435
436 // Enqueue an update for our listeners.
437 try {
438 this.updates.put(new HARoleUpdate(role, oldRole));
439 } catch (InterruptedException e) {
440 log.error("Failure adding update to queue", e);
441 }
442 }
443 }
444
445
446
447 // **********************
448 // ChannelUpstreamHandler
449 // **********************
450
451 /**
452 * Return a new channel handler for processing a switch connections
453 * @param state The channel state object for the connection
454 * @return the new channel handler
455 */
456 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
457 return new OFChannelHandler(state);
458 }
459
Jonathan Hartcc957a02013-02-26 10:39:04 -0800460 protected class RoleChangeCallback implements ControlChangeCallback {
461 @Override
462 public void controlChanged(long dpid, boolean hasControl) {
463 log.info("Role change callback for switch {}, hasControl {}",
464 HexString.toHexString(dpid), hasControl);
465
466 synchronized(roleChanger){
467 OFSwitchImpl sw = null;
468 for (OFSwitchImpl connectedSw : connectedSwitches){
469 if (connectedSw.getId() == dpid){
470 sw = connectedSw;
471 break;
472 }
473 }
474 if (sw == null){
475 log.warn("Switch {} not found in connected switches",
476 HexString.toHexString(dpid));
477 return;
478 }
479
480 Role role = null;
481
Pankaj Berde01939e92013-03-08 14:38:27 -0800482 /*
483 * issue #229
484 * Cannot rely on sw.getRole() as it can be behind due to pending
485 * role changes in the queue. Just submit it and late the RoleChanger
486 * handle duplicates.
487 */
488
489 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800490 role = Role.MASTER;
491 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800492 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800493 role = Role.SLAVE;
494 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800495
496 log.debug("Sending role request {} msg to {}", role, sw);
497 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
498 swList.add(sw);
499 roleChanger.submitRequest(swList, role);
500
Jonathan Hartcc957a02013-02-26 10:39:04 -0800501 }
502
503 }
504 }
505
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800506 /**
507 * Channel handler deals with the switch connection and dispatches
508 * switch messages to the appropriate locations.
509 * @author readams
510 */
511 protected class OFChannelHandler
512 extends IdleStateAwareChannelUpstreamHandler {
513 protected OFSwitchImpl sw;
514 protected OFChannelState state;
515
516 public OFChannelHandler(OFChannelState state) {
517 this.state = state;
518 }
519
520 @Override
521 @LogMessageDoc(message="New switch connection from {ip address}",
522 explanation="A new switch has connected from the " +
523 "specified IP address")
524 public void channelConnected(ChannelHandlerContext ctx,
525 ChannelStateEvent e) throws Exception {
526 log.info("New switch connection from {}",
527 e.getChannel().getRemoteAddress());
528
529 sw = new OFSwitchImpl();
530 sw.setChannel(e.getChannel());
531 sw.setFloodlightProvider(Controller.this);
532 sw.setThreadPoolService(threadPool);
533
534 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
535 msglist.add(factory.getMessage(OFType.HELLO));
536 e.getChannel().write(msglist);
537
538 }
539
540 @Override
541 @LogMessageDoc(message="Disconnected switch {switch information}",
542 explanation="The specified switch has disconnected.")
543 public void channelDisconnected(ChannelHandlerContext ctx,
544 ChannelStateEvent e) throws Exception {
545 if (sw != null && state.hsState == HandshakeState.READY) {
546 if (activeSwitches.containsKey(sw.getId())) {
547 // It's safe to call removeSwitch even though the map might
548 // not contain this particular switch but another with the
549 // same DPID
550 removeSwitch(sw);
551 }
552 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700553 if (controlRequested) {
554 registryService.releaseControl(sw.getId());
555 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800556 connectedSwitches.remove(sw);
557 }
558 sw.setConnected(false);
559 }
560 log.info("Disconnected switch {}", sw);
561 }
562
563 @Override
564 @LogMessageDocs({
565 @LogMessageDoc(level="ERROR",
566 message="Disconnecting switch {switch} due to read timeout",
567 explanation="The connected switch has failed to send any " +
568 "messages or respond to echo requests",
569 recommendation=LogMessageDoc.CHECK_SWITCH),
570 @LogMessageDoc(level="ERROR",
571 message="Disconnecting switch {switch}: failed to " +
572 "complete handshake",
573 explanation="The switch did not respond correctly " +
574 "to handshake messages",
575 recommendation=LogMessageDoc.CHECK_SWITCH),
576 @LogMessageDoc(level="ERROR",
577 message="Disconnecting switch {switch} due to IO Error: {}",
578 explanation="There was an error communicating with the switch",
579 recommendation=LogMessageDoc.CHECK_SWITCH),
580 @LogMessageDoc(level="ERROR",
581 message="Disconnecting switch {switch} due to switch " +
582 "state error: {error}",
583 explanation="The switch sent an unexpected message",
584 recommendation=LogMessageDoc.CHECK_SWITCH),
585 @LogMessageDoc(level="ERROR",
586 message="Disconnecting switch {switch} due to " +
587 "message parse failure",
588 explanation="Could not parse a message from the switch",
589 recommendation=LogMessageDoc.CHECK_SWITCH),
590 @LogMessageDoc(level="ERROR",
591 message="Terminating controller due to storage exception",
592 explanation=ERROR_DATABASE,
593 recommendation=LogMessageDoc.CHECK_CONTROLLER),
594 @LogMessageDoc(level="ERROR",
595 message="Could not process message: queue full",
596 explanation="OpenFlow messages are arriving faster than " +
597 " the controller can process them.",
598 recommendation=LogMessageDoc.CHECK_CONTROLLER),
599 @LogMessageDoc(level="ERROR",
600 message="Error while processing message " +
601 "from switch {switch} {cause}",
602 explanation="An error occurred processing the switch message",
603 recommendation=LogMessageDoc.GENERIC_ACTION)
604 })
605 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
606 throws Exception {
607 if (e.getCause() instanceof ReadTimeoutException) {
608 // switch timeout
609 log.error("Disconnecting switch {} due to read timeout", sw);
610 ctx.getChannel().close();
611 } else if (e.getCause() instanceof HandshakeTimeoutException) {
612 log.error("Disconnecting switch {}: failed to complete handshake",
613 sw);
614 ctx.getChannel().close();
615 } else if (e.getCause() instanceof ClosedChannelException) {
616 //log.warn("Channel for sw {} already closed", sw);
617 } else if (e.getCause() instanceof IOException) {
618 log.error("Disconnecting switch {} due to IO Error: {}",
619 sw, e.getCause().getMessage());
620 ctx.getChannel().close();
621 } else if (e.getCause() instanceof SwitchStateException) {
622 log.error("Disconnecting switch {} due to switch state error: {}",
623 sw, e.getCause().getMessage());
624 ctx.getChannel().close();
625 } else if (e.getCause() instanceof MessageParseException) {
626 log.error("Disconnecting switch " + sw +
627 " due to message parse failure",
628 e.getCause());
629 ctx.getChannel().close();
630 } else if (e.getCause() instanceof StorageException) {
631 log.error("Terminating controller due to storage exception",
632 e.getCause());
633 terminate();
634 } else if (e.getCause() instanceof RejectedExecutionException) {
635 log.warn("Could not process message: queue full");
636 } else {
637 log.error("Error while processing message from switch " + sw,
638 e.getCause());
639 ctx.getChannel().close();
640 }
641 }
642
643 @Override
644 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
645 throws Exception {
646 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
647 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
648 e.getChannel().write(msglist);
649 }
650
651 @Override
652 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
653 throws Exception {
654 if (e.getMessage() instanceof List) {
655 @SuppressWarnings("unchecked")
656 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
657
658 for (OFMessage ofm : msglist) {
659 try {
660 processOFMessage(ofm);
661 }
662 catch (Exception ex) {
663 // We are the last handler in the stream, so run the
664 // exception through the channel again by passing in
665 // ctx.getChannel().
666 Channels.fireExceptionCaught(ctx.getChannel(), ex);
667 }
668 }
669
670 // Flush all flow-mods/packet-out generated from this "train"
671 OFSwitchImpl.flush_all();
672 }
673 }
674
675 /**
676 * Process the request for the switch description
677 */
678 @LogMessageDoc(level="ERROR",
679 message="Exception in reading description " +
680 " during handshake {exception}",
681 explanation="Could not process the switch description string",
682 recommendation=LogMessageDoc.CHECK_SWITCH)
683 void processSwitchDescReply() {
684 try {
685 // Read description, if it has been updated
686 @SuppressWarnings("unchecked")
687 Future<List<OFStatistics>> desc_future =
688 (Future<List<OFStatistics>>)sw.
689 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
690 List<OFStatistics> values =
691 desc_future.get(0, TimeUnit.MILLISECONDS);
692 if (values != null) {
693 OFDescriptionStatistics description =
694 new OFDescriptionStatistics();
695 ChannelBuffer data =
696 ChannelBuffers.buffer(description.getLength());
697 for (OFStatistics f : values) {
698 f.writeTo(data);
699 description.readFrom(data);
700 break; // SHOULD be a list of length 1
701 }
702 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
703 description);
704 sw.setSwitchProperties(description);
705 data = null;
706
707 // At this time, also set other switch properties from storage
708 boolean is_core_switch = false;
709 IResultSet resultSet = null;
710 try {
711 String swid = sw.getStringId();
712 resultSet =
713 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
714 for (Iterator<IResultSet> it =
715 resultSet.iterator(); it.hasNext();) {
716 // In case of multiple rows, use the status
717 // in last row?
718 Map<String, Object> row = it.next().getRow();
719 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
720 if (log.isDebugEnabled()) {
721 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
722 "config for switch={}, is-core={}",
723 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
724 }
725 String ics =
726 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
727 is_core_switch = ics.equals("true");
728 }
729 }
730 }
731 finally {
732 if (resultSet != null)
733 resultSet.close();
734 }
735 if (is_core_switch) {
736 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
737 new Boolean(true));
738 }
739 }
740 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
741 state.hasDescription = true;
742 checkSwitchReady();
743 }
744 catch (InterruptedException ex) {
745 // Ignore
746 }
747 catch (TimeoutException ex) {
748 // Ignore
749 } catch (Exception ex) {
750 log.error("Exception in reading description " +
751 " during handshake", ex);
752 }
753 }
754
755 /**
756 * Send initial switch setup information that we need before adding
757 * the switch
758 * @throws IOException
759 */
760 void sendHelloConfiguration() throws IOException {
761 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700762 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800763 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
764 }
765
766 /**
767 * Send the configuration requests we can only do after we have
768 * the features reply
769 * @throws IOException
770 */
771 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700772 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800773 // Ensure we receive the full packet via PacketIn
774 OFSetConfig config = (OFSetConfig) factory
775 .getMessage(OFType.SET_CONFIG);
776 config.setMissSendLength((short) 0xffff)
777 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
778 sw.write(config, null);
779 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
780 null);
781
782 // Get Description to set switch-specific flags
783 OFStatisticsRequest req = new OFStatisticsRequest();
784 req.setStatisticType(OFStatisticsType.DESC);
785 req.setLengthU(req.getLengthU());
786 Future<List<OFStatistics>> dfuture =
787 sw.getStatistics(req);
788 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
789 dfuture);
790
791 }
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700792
793 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800794 protected void checkSwitchReady() {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700795
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800796 if (state.hsState == HandshakeState.FEATURES_REPLY &&
797 state.hasDescription && state.hasGetConfigReply) {
798
799 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700800 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800801
802 synchronized(roleChanger) {
803 // We need to keep track of all of the switches that are connected
804 // to the controller, in any role, so that we can later send the
805 // role request messages when the controller role changes.
806 // We need to be synchronized while doing this: we must not
807 // send a another role request to the connectedSwitches until
808 // we were able to add this new switch to connectedSwitches
809 // *and* send the current role to the new switch.
810 connectedSwitches.add(sw);
811
812 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800813 //Put the switch in SLAVE mode until we know we have control
814 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
815 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
816 swList.add(sw);
817 roleChanger.submitRequest(swList, Role.SLAVE);
818
Jonathan Hartcc957a02013-02-26 10:39:04 -0800819 //Request control of the switch from the global registry
820 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700821 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800822 registryService.requestControl(sw.getId(),
823 new RoleChangeCallback());
824 } catch (RegistryException e) {
825 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700826 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800827 }
828
Jonathan Hart97801ac2013-02-26 14:29:16 -0800829
Jonathan Hartcc957a02013-02-26 10:39:04 -0800830
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800831 // Send a role request if role support is enabled for the controller
832 // This is a probe that we'll use to determine if the switch
833 // actually supports the role request message. If it does we'll
834 // get back a role reply message. If it doesn't we'll get back an
835 // OFError message.
836 // If role is MASTER we will promote switch to active
837 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800838 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800839 log.debug("This controller's role is {}, " +
840 "sending initial role request msg to {}",
841 role, sw);
842 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
843 swList.add(sw);
844 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800845 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800846 }
847 else {
848 // Role supported not enabled on controller (for now)
849 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800850 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800851 "not sending role request msg to {}",
852 role, sw);
853 // Need to clear FlowMods before we add the switch
854 // and dispatch updates otherwise we have a race condition.
855 sw.clearAllFlowMods();
856 addSwitch(sw);
857 state.firstRoleReplyReceived = true;
858 }
859 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700860 if (!controlRequested) {
861 // yield to allow other thread(s) to release control
862 try {
863 Thread.sleep(10);
864 } catch (InterruptedException e) {
865 // Ignore interruptions
866 }
867 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700868 log.debug("Closing {} because we weren't able to request control " +
869 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700870 sw.channel.close();
871 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800872 }
873 }
874
875 /* Handle a role reply message we received from the switch. Since
876 * netty serializes message dispatch we don't need to synchronize
877 * against other receive operations from the same switch, so no need
878 * to synchronize addSwitch(), removeSwitch() operations from the same
879 * connection.
880 * FIXME: However, when a switch with the same DPID connects we do
881 * need some synchronization. However, handling switches with same
882 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
883 * removedSwitch notification):1
884 *
885 */
886 @LogMessageDoc(level="ERROR",
887 message="Invalid role value in role reply message",
888 explanation="Was unable to set the HA role (master or slave) " +
889 "for the controller.",
890 recommendation=LogMessageDoc.CHECK_CONTROLLER)
891 protected void handleRoleReplyMessage(OFVendor vendorMessage,
892 OFRoleReplyVendorData roleReplyVendorData) {
893 // Map from the role code in the message to our role enum
894 int nxRole = roleReplyVendorData.getRole();
895 Role role = null;
896 switch (nxRole) {
897 case OFRoleVendorData.NX_ROLE_OTHER:
898 role = Role.EQUAL;
899 break;
900 case OFRoleVendorData.NX_ROLE_MASTER:
901 role = Role.MASTER;
902 break;
903 case OFRoleVendorData.NX_ROLE_SLAVE:
904 role = Role.SLAVE;
905 break;
906 default:
907 log.error("Invalid role value in role reply message");
908 sw.getChannel().close();
909 return;
910 }
911
912 log.debug("Handling role reply for role {} from {}. " +
913 "Controller's role is {} ",
914 new Object[] { role, sw, Controller.this.role}
915 );
916
917 sw.deliverRoleReply(vendorMessage.getXid(), role);
918
919 boolean isActive = activeSwitches.containsKey(sw.getId());
920 if (!isActive && sw.isActive()) {
921 // Transition from SLAVE to MASTER.
922
923 if (!state.firstRoleReplyReceived ||
924 getAlwaysClearFlowsOnSwAdd()) {
925 // This is the first role-reply message we receive from
926 // this switch or roles were disabled when the switch
927 // connected:
928 // Delete all pre-existing flows for new connections to
929 // the master
930 //
931 // FIXME: Need to think more about what the test should
932 // be for when we flush the flow-table? For example,
933 // if all the controllers are temporarily in the backup
934 // role (e.g. right after a failure of the master
935 // controller) at the point the switch connects, then
936 // all of the controllers will initially connect as
937 // backup controllers and not flush the flow-table.
938 // Then when one of them is promoted to master following
939 // the master controller election the flow-table
940 // will still not be flushed because that's treated as
941 // a failover event where we don't want to flush the
942 // flow-table. The end result would be that the flow
943 // table for a newly connected switch is never
944 // flushed. Not sure how to handle that case though...
945 sw.clearAllFlowMods();
946 log.debug("First role reply from master switch {}, " +
947 "clear FlowTable to active switch list",
948 HexString.toHexString(sw.getId()));
949 }
950
951 // Some switches don't seem to update us with port
952 // status messages while in slave role.
953 readSwitchPortStateFromStorage(sw);
954
955 // Only add the switch to the active switch list if
956 // we're not in the slave role. Note that if the role
957 // attribute is null, then that means that the switch
958 // doesn't support the role request messages, so in that
959 // case we're effectively in the EQUAL role and the
960 // switch should be included in the active switch list.
961 addSwitch(sw);
962 log.debug("Added master switch {} to active switch list",
963 HexString.toHexString(sw.getId()));
964
965 }
966 else if (isActive && !sw.isActive()) {
967 // Transition from MASTER to SLAVE: remove switch
968 // from active switch list.
969 log.debug("Removed slave switch {} from active switch" +
970 " list", HexString.toHexString(sw.getId()));
971 removeSwitch(sw);
972 }
973
974 // Indicate that we have received a role reply message.
975 state.firstRoleReplyReceived = true;
976 }
977
978 protected boolean handleVendorMessage(OFVendor vendorMessage) {
979 boolean shouldHandleMessage = false;
980 int vendor = vendorMessage.getVendor();
981 switch (vendor) {
982 case OFNiciraVendorData.NX_VENDOR_ID:
983 OFNiciraVendorData niciraVendorData =
984 (OFNiciraVendorData)vendorMessage.getVendorData();
985 int dataType = niciraVendorData.getDataType();
986 switch (dataType) {
987 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
988 OFRoleReplyVendorData roleReplyVendorData =
989 (OFRoleReplyVendorData) niciraVendorData;
990 handleRoleReplyMessage(vendorMessage,
991 roleReplyVendorData);
992 break;
993 default:
994 log.warn("Unhandled Nicira VENDOR message; " +
995 "data type = {}", dataType);
996 break;
997 }
998 break;
999 default:
1000 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1001 break;
1002 }
1003
1004 return shouldHandleMessage;
1005 }
1006
1007 /**
1008 * Dispatch an Openflow message from a switch to the appropriate
1009 * handler.
1010 * @param m The message to process
1011 * @throws IOException
1012 * @throws SwitchStateException
1013 */
1014 @LogMessageDocs({
1015 @LogMessageDoc(level="WARN",
1016 message="Config Reply from {switch} has " +
1017 "miss length set to {length}",
1018 explanation="The controller requires that the switch " +
1019 "use a miss length of 0xffff for correct " +
1020 "function",
1021 recommendation="Use a different switch to ensure " +
1022 "correct function"),
1023 @LogMessageDoc(level="WARN",
1024 message="Received ERROR from sw {switch} that "
1025 +"indicates roles are not supported "
1026 +"but we have received a valid "
1027 +"role reply earlier",
1028 explanation="The switch sent a confusing message to the" +
1029 "controller")
1030 })
1031 protected void processOFMessage(OFMessage m)
1032 throws IOException, SwitchStateException {
1033 boolean shouldHandleMessage = false;
1034
1035 switch (m.getType()) {
1036 case HELLO:
1037 if (log.isTraceEnabled())
1038 log.trace("HELLO from {}", sw);
1039
1040 if (state.hsState.equals(HandshakeState.START)) {
1041 state.hsState = HandshakeState.HELLO;
1042 sendHelloConfiguration();
1043 } else {
1044 throw new SwitchStateException("Unexpected HELLO from "
1045 + sw);
1046 }
1047 break;
1048 case ECHO_REQUEST:
1049 OFEchoReply reply =
1050 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1051 reply.setXid(m.getXid());
1052 sw.write(reply, null);
1053 break;
1054 case ECHO_REPLY:
1055 break;
1056 case FEATURES_REPLY:
1057 if (log.isTraceEnabled())
1058 log.trace("Features Reply from {}", sw);
1059
1060 sw.setFeaturesReply((OFFeaturesReply) m);
1061 if (state.hsState.equals(HandshakeState.HELLO)) {
1062 sendFeatureReplyConfiguration();
1063 state.hsState = HandshakeState.FEATURES_REPLY;
1064 // uncomment to enable "dumb" switches like cbench
1065 // state.hsState = HandshakeState.READY;
1066 // addSwitch(sw);
1067 } else {
1068 // return results to rest api caller
1069 sw.deliverOFFeaturesReply(m);
1070 // update database */
1071 updateActiveSwitchInfo(sw);
1072 }
1073 break;
1074 case GET_CONFIG_REPLY:
1075 if (log.isTraceEnabled())
1076 log.trace("Get config reply from {}", sw);
1077
1078 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1079 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1080 throw new SwitchStateException(em);
1081 }
1082 OFGetConfigReply cr = (OFGetConfigReply) m;
1083 if (cr.getMissSendLength() == (short)0xffff) {
1084 log.trace("Config Reply from {} confirms " +
1085 "miss length set to 0xffff", sw);
1086 } else {
1087 log.warn("Config Reply from {} has " +
1088 "miss length set to {}",
1089 sw, cr.getMissSendLength() & 0xffff);
1090 }
1091 state.hasGetConfigReply = true;
1092 checkSwitchReady();
1093 break;
1094 case VENDOR:
1095 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1096 break;
1097 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001098 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001099 // TODO: we need better error handling. Especially for
1100 // request/reply style message (stats, roles) we should have
1101 // a unified way to lookup the xid in the error message.
1102 // This will probable involve rewriting the way we handle
1103 // request/reply style messages.
1104 OFError error = (OFError) m;
1105 boolean shouldLogError = true;
1106 // TODO: should we check that firstRoleReplyReceived is false,
1107 // i.e., check only whether the first request fails?
1108 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1109 boolean isBadVendorError =
1110 (error.getErrorType() == OFError.OFErrorType.
1111 OFPET_BAD_REQUEST.getValue());
1112 // We expect to receive a bad vendor error when
1113 // we're connected to a switch that doesn't support
1114 // the Nicira vendor extensions (i.e. not OVS or
1115 // derived from OVS). By protocol, it should also be
1116 // BAD_VENDOR, but too many switch implementations
1117 // get it wrong and we can already check the xid()
1118 // so we can ignore the type with confidence that this
1119 // is not a spurious error
1120 shouldLogError = !isBadVendorError;
1121 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001122 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001123 if (state.firstRoleReplyReceived && (role != null)) {
1124 log.warn("Received ERROR from sw {} that "
1125 +"indicates roles are not supported "
1126 +"but we have received a valid "
1127 +"role reply earlier", sw);
1128 }
1129 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001130 Role requestedRole =
1131 sw.deliverRoleRequestNotSupported(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001132 synchronized(roleChanger) {
1133 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001134 //This will now never happen. The Controller's role
1135 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001136 // the switch doesn't understand role request
1137 // messages and the current controller role is
1138 // slave. We need to disconnect the switch.
1139 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001140 log.warn("Closing {} channel because controller's role " +
1141 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001142 sw.getChannel().close();
1143 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001144 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001145 log.debug("Adding switch {} because we got an error" +
1146 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001147 // Controller's role is master: add to
1148 // active
1149 // TODO: check if clearing flow table is
1150 // right choice here.
1151 // Need to clear FlowMods before we add the switch
1152 // and dispatch updates otherwise we have a race condition.
1153 // TODO: switch update is async. Won't we still have a potential
1154 // race condition?
1155 sw.clearAllFlowMods();
1156 addSwitch(sw);
1157 }
1158 }
1159 }
1160 else {
1161 // TODO: Is this the right thing to do if we receive
1162 // some other error besides a bad vendor error?
1163 // Presumably that means the switch did actually
1164 // understand the role request message, but there
1165 // was some other error from processing the message.
1166 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1167 // error code, but it doesn't look like the Nicira
1168 // role request has that. Should check OVS source
1169 // code to see if it's possible for any other errors
1170 // to be returned.
1171 // If we received an error the switch is not
1172 // in the correct role, so we need to disconnect it.
1173 // We could also resend the request but then we need to
1174 // check if there are other pending request in which
1175 // case we shouldn't resend. If we do resend we need
1176 // to make sure that the switch eventually accepts one
1177 // of our requests or disconnect the switch. This feels
1178 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001179 log.debug("Closing {} channel because we recieved an " +
1180 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001181 sw.getChannel().close();
1182 }
1183 }
1184 // Once we support OF 1.2, we'd add code to handle it here.
1185 //if (error.getXid() == state.ofRoleRequestXid) {
1186 //}
1187 if (shouldLogError)
1188 logError(sw, error);
1189 break;
1190 case STATS_REPLY:
1191 if (state.hsState.ordinal() <
1192 HandshakeState.FEATURES_REPLY.ordinal()) {
1193 String em = "Unexpected STATS_REPLY from " + sw;
1194 throw new SwitchStateException(em);
1195 }
1196 sw.deliverStatisticsReply(m);
1197 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1198 processSwitchDescReply();
1199 }
1200 break;
1201 case PORT_STATUS:
1202 // We want to update our port state info even if we're in
1203 // the slave role, but we only want to update storage if
1204 // we're the master (or equal).
1205 boolean updateStorage = state.hsState.
1206 equals(HandshakeState.READY) &&
1207 (sw.getRole() != Role.SLAVE);
1208 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1209 shouldHandleMessage = true;
1210 break;
1211
1212 default:
1213 shouldHandleMessage = true;
1214 break;
1215 }
1216
1217 if (shouldHandleMessage) {
1218 sw.getListenerReadLock().lock();
1219 try {
1220 if (sw.isConnected()) {
1221 if (!state.hsState.equals(HandshakeState.READY)) {
1222 log.debug("Ignoring message type {} received " +
1223 "from switch {} before switch is " +
1224 "fully configured.", m.getType(), sw);
1225 }
1226 // Check if the controller is in the slave role for the
1227 // switch. If it is, then don't dispatch the message to
1228 // the listeners.
1229 // TODO: Should we dispatch messages that we expect to
1230 // receive when we're in the slave role, e.g. port
1231 // status messages? Since we're "hiding" switches from
1232 // the listeners when we're in the slave role, then it
1233 // seems a little weird to dispatch port status messages
1234 // to them. On the other hand there might be special
1235 // modules that care about all of the connected switches
1236 // and would like to receive port status notifications.
1237 else if (sw.getRole() == Role.SLAVE) {
1238 // Don't log message if it's a port status message
1239 // since we expect to receive those from the switch
1240 // and don't want to emit spurious messages.
1241 if (m.getType() != OFType.PORT_STATUS) {
1242 log.debug("Ignoring message type {} received " +
1243 "from switch {} while in the slave role.",
1244 m.getType(), sw);
1245 }
1246 } else {
1247 handleMessage(sw, m, null);
1248 }
1249 }
1250 }
1251 finally {
1252 sw.getListenerReadLock().unlock();
1253 }
1254 }
1255 }
1256 }
1257
1258 // ****************
1259 // Message handlers
1260 // ****************
1261
1262 protected void handlePortStatusMessage(IOFSwitch sw,
1263 OFPortStatus m,
1264 boolean updateStorage) {
1265 short portNumber = m.getDesc().getPortNumber();
1266 OFPhysicalPort port = m.getDesc();
1267 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001268 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1269 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001270 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001271 if (!portDown) {
Pankaj Berde6debb042013-01-16 18:04:32 -08001272 swStore.addPort(sw.getStringId(), port);
1273 } else {
1274 swStore.deletePort(sw.getStringId(), port.getPortNumber());
1275 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001276 if (updateStorage)
1277 updatePortInfo(sw, port);
1278 log.debug("Port #{} modified for {}", portNumber, sw);
1279 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1280 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001281 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001282 if (updateStorage)
1283 updatePortInfo(sw, port);
1284 log.debug("Port #{} added for {}", portNumber, sw);
1285 } else if (m.getReason() ==
1286 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1287 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001288 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001289 if (updateStorage)
1290 removePortInfo(sw, portNumber);
1291 log.debug("Port #{} deleted for {}", portNumber, sw);
1292 }
1293 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1294 try {
1295 this.updates.put(update);
1296 } catch (InterruptedException e) {
1297 log.error("Failure adding update to queue", e);
1298 }
1299 }
1300
1301 /**
1302 * flcontext_cache - Keep a thread local stack of contexts
1303 */
1304 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1305 new ThreadLocal <Stack<FloodlightContext>> () {
1306 @Override
1307 protected Stack<FloodlightContext> initialValue() {
1308 return new Stack<FloodlightContext>();
1309 }
1310 };
1311
1312 /**
1313 * flcontext_alloc - pop a context off the stack, if required create a new one
1314 * @return FloodlightContext
1315 */
1316 protected static FloodlightContext flcontext_alloc() {
1317 FloodlightContext flcontext = null;
1318
1319 if (flcontext_cache.get().empty()) {
1320 flcontext = new FloodlightContext();
1321 }
1322 else {
1323 flcontext = flcontext_cache.get().pop();
1324 }
1325
1326 return flcontext;
1327 }
1328
1329 /**
1330 * flcontext_free - Free the context to the current thread
1331 * @param flcontext
1332 */
1333 protected void flcontext_free(FloodlightContext flcontext) {
1334 flcontext.getStorage().clear();
1335 flcontext_cache.get().push(flcontext);
1336 }
1337
1338 /**
1339 * Handle replies to certain OFMessages, and pass others off to listeners
1340 * @param sw The switch for the message
1341 * @param m The message
1342 * @param bContext The floodlight context. If null then floodlight context would
1343 * be allocated in this function
1344 * @throws IOException
1345 */
1346 @LogMessageDocs({
1347 @LogMessageDoc(level="ERROR",
1348 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1349 " field is empty.",
1350 explanation="The switch sent an improperly-formatted PacketIn" +
1351 " message",
1352 recommendation=LogMessageDoc.CHECK_SWITCH),
1353 @LogMessageDoc(level="WARN",
1354 message="Unhandled OF Message: {} from {}",
1355 explanation="The switch sent a message not handled by " +
1356 "the controller")
1357 })
1358 protected void handleMessage(IOFSwitch sw, OFMessage m,
1359 FloodlightContext bContext)
1360 throws IOException {
1361 Ethernet eth = null;
1362
1363 switch (m.getType()) {
1364 case PACKET_IN:
1365 OFPacketIn pi = (OFPacketIn)m;
1366
1367 if (pi.getPacketData().length <= 0) {
1368 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1369 ") because the data field is empty.");
1370 return;
1371 }
1372
1373 if (Controller.ALWAYS_DECODE_ETH) {
1374 eth = new Ethernet();
1375 eth.deserialize(pi.getPacketData(), 0,
1376 pi.getPacketData().length);
1377 counterStore.updatePacketInCounters(sw, m, eth);
1378 }
1379 // fall through to default case...
1380
1381 default:
1382
1383 List<IOFMessageListener> listeners = null;
1384 if (messageListeners.containsKey(m.getType())) {
1385 listeners = messageListeners.get(m.getType()).
1386 getOrderedListeners();
1387 }
1388
1389 FloodlightContext bc = null;
1390 if (listeners != null) {
1391 // Check if floodlight context is passed from the calling
1392 // function, if so use that floodlight context, otherwise
1393 // allocate one
1394 if (bContext == null) {
1395 bc = flcontext_alloc();
1396 } else {
1397 bc = bContext;
1398 }
1399 if (eth != null) {
1400 IFloodlightProviderService.bcStore.put(bc,
1401 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1402 eth);
1403 }
1404
1405 // Get the starting time (overall and per-component) of
1406 // the processing chain for this packet if performance
1407 // monitoring is turned on
1408 pktinProcTime.bootstrap(listeners);
1409 pktinProcTime.recordStartTimePktIn();
1410 Command cmd;
1411 for (IOFMessageListener listener : listeners) {
1412 if (listener instanceof IOFSwitchFilter) {
1413 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1414 continue;
1415 }
1416 }
1417
1418 pktinProcTime.recordStartTimeComp(listener);
1419 cmd = listener.receive(sw, m, bc);
1420 pktinProcTime.recordEndTimeComp(listener);
1421
1422 if (Command.STOP.equals(cmd)) {
1423 break;
1424 }
1425 }
1426 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1427 } else {
1428 log.warn("Unhandled OF Message: {} from {}", m, sw);
1429 }
1430
1431 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1432 }
1433 }
1434
1435 /**
1436 * Log an OpenFlow error message from a switch
1437 * @param sw The switch that sent the error
1438 * @param error The error message
1439 */
1440 @LogMessageDoc(level="ERROR",
1441 message="Error {error type} {error code} from {switch}",
1442 explanation="The switch responded with an unexpected error" +
1443 "to an OpenFlow message from the controller",
1444 recommendation="This could indicate improper network operation. " +
1445 "If the problem persists restarting the switch and " +
1446 "controller may help."
1447 )
1448 protected void logError(IOFSwitch sw, OFError error) {
1449 int etint = 0xffff & error.getErrorType();
1450 if (etint < 0 || etint >= OFErrorType.values().length) {
1451 log.error("Unknown error code {} from sw {}", etint, sw);
1452 }
1453 OFErrorType et = OFErrorType.values()[etint];
1454 switch (et) {
1455 case OFPET_HELLO_FAILED:
1456 OFHelloFailedCode hfc =
1457 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1458 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1459 break;
1460 case OFPET_BAD_REQUEST:
1461 OFBadRequestCode brc =
1462 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1463 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1464 break;
1465 case OFPET_BAD_ACTION:
1466 OFBadActionCode bac =
1467 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1468 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1469 break;
1470 case OFPET_FLOW_MOD_FAILED:
1471 OFFlowModFailedCode fmfc =
1472 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1473 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1474 break;
1475 case OFPET_PORT_MOD_FAILED:
1476 OFPortModFailedCode pmfc =
1477 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1478 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1479 break;
1480 case OFPET_QUEUE_OP_FAILED:
1481 OFQueueOpFailedCode qofc =
1482 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1483 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1484 break;
1485 default:
1486 break;
1487 }
1488 }
1489
1490 /**
1491 * Add a switch to the active switch list and call the switch listeners.
1492 * This happens either when a switch first connects (and the controller is
1493 * not in the slave role) or when the role of the controller changes from
1494 * slave to master.
1495 * @param sw the switch that has been added
1496 */
1497 // TODO: need to rethink locking and the synchronous switch update.
1498 // We can / should also handle duplicate DPIDs in connectedSwitches
1499 @LogMessageDoc(level="ERROR",
1500 message="New switch added {switch} for already-added switch {switch}",
1501 explanation="A switch with the same DPID as another switch " +
1502 "connected to the controller. This can be caused by " +
1503 "multiple switches configured with the same DPID, or " +
1504 "by a switch reconnected very quickly after " +
1505 "disconnecting.",
1506 recommendation="If this happens repeatedly, it is likely there " +
1507 "are switches with duplicate DPIDs on the network. " +
1508 "Reconfigure the appropriate switches. If it happens " +
1509 "very rarely, then it is likely this is a transient " +
1510 "network problem that can be ignored."
1511 )
1512 protected void addSwitch(IOFSwitch sw) {
1513 // TODO: is it safe to modify the HashMap without holding
1514 // the old switch's lock?
1515 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1516 if (sw == oldSw) {
1517 // Note == for object equality, not .equals for value
1518 log.info("New add switch for pre-existing switch {}", sw);
1519 return;
1520 }
1521
1522 if (oldSw != null) {
1523 oldSw.getListenerWriteLock().lock();
1524 try {
1525 log.error("New switch added {} for already-added switch {}",
1526 sw, oldSw);
1527 // Set the connected flag to false to suppress calling
1528 // the listeners for this switch in processOFMessage
1529 oldSw.setConnected(false);
1530
1531 oldSw.cancelAllStatisticsReplies();
1532
1533 updateInactiveSwitchInfo(oldSw);
1534
1535 // we need to clean out old switch state definitively
1536 // before adding the new switch
1537 // FIXME: It seems not completely kosher to call the
1538 // switch listeners here. I thought one of the points of
1539 // having the asynchronous switch update mechanism was so
1540 // the addedSwitch and removedSwitch were always called
1541 // from a single thread to simplify concurrency issues
1542 // for the listener.
1543 if (switchListeners != null) {
1544 for (IOFSwitchListener listener : switchListeners) {
1545 listener.removedSwitch(oldSw);
1546 }
1547 }
1548 // will eventually trigger a removeSwitch(), which will cause
1549 // a "Not removing Switch ... already removed debug message.
1550 // TODO: Figure out a way to handle this that avoids the
1551 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001552 log.debug("Closing {} because a new IOFSwitch got added " +
1553 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001554 oldSw.getChannel().close();
1555 }
1556 finally {
1557 oldSw.getListenerWriteLock().unlock();
1558 }
1559 }
1560
1561 updateActiveSwitchInfo(sw);
Pankaj Berdef8ad2852013-02-27 17:06:14 -08001562 if (registryService.hasControl(sw.getId())) {
1563 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
1564 for (OFPhysicalPort port: sw.getPorts()) {
1565 swStore.addPort(sw.getStringId(), port);
1566 }
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001567 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001568 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1569 try {
1570 this.updates.put(update);
1571 } catch (InterruptedException e) {
1572 log.error("Failure adding update to queue", e);
1573 }
1574 }
1575
1576 /**
1577 * Remove a switch from the active switch list and call the switch listeners.
1578 * This happens either when the switch is disconnected or when the
1579 * controller's role for the switch changes from master to slave.
1580 * @param sw the switch that has been removed
1581 */
1582 protected void removeSwitch(IOFSwitch sw) {
1583 // No need to acquire the listener lock, since
1584 // this method is only called after netty has processed all
1585 // pending messages
1586 log.debug("removeSwitch: {}", sw);
Pankaj Berdefa4d0f72013-03-13 17:59:37 -07001587 //
1588 // Cannot set sw to inactive in network map due to race condition
1589 // Need a cleanup thread to periodically check switches not active in registry
1590 // and acquire control to set to inactive state in network map and release it
1591 //
1592 // if (registryService.hasControl(sw.getId())) {
1593 // swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
1594 // }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001595 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1596 log.debug("Not removing switch {}; already removed", sw);
1597 return;
1598 }
1599 // We cancel all outstanding statistics replies if the switch transition
1600 // from active. In the future we might allow statistics requests
1601 // from slave controllers. Then we need to move this cancelation
1602 // to switch disconnect
1603 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001604
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001605
1606 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1607 // here if role support is enabled. In that case if the switch is being
1608 // removed because we've been switched to being in the slave role, then I think
1609 // it's possible that the new master may have already been promoted to master
1610 // and written out the active switch state to storage. If we now execute
1611 // updateInactiveSwitchInfo we may wipe out all of the state that was
1612 // written out by the new master. Maybe need to revisit how we handle all
1613 // of the switch state that's written to storage.
1614
1615 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001616
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001617 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1618 try {
1619 this.updates.put(update);
1620 } catch (InterruptedException e) {
1621 log.error("Failure adding update to queue", e);
1622 }
1623 }
1624
1625 // ***************
1626 // IFloodlightProvider
1627 // ***************
1628
1629 @Override
1630 public synchronized void addOFMessageListener(OFType type,
1631 IOFMessageListener listener) {
1632 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1633 messageListeners.get(type);
1634 if (ldd == null) {
1635 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1636 messageListeners.put(type, ldd);
1637 }
1638 ldd.addListener(type, listener);
1639 }
1640
1641 @Override
1642 public synchronized void removeOFMessageListener(OFType type,
1643 IOFMessageListener listener) {
1644 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1645 messageListeners.get(type);
1646 if (ldd != null) {
1647 ldd.removeListener(listener);
1648 }
1649 }
1650
1651 private void logListeners() {
1652 for (Map.Entry<OFType,
1653 ListenerDispatcher<OFType,
1654 IOFMessageListener>> entry
1655 : messageListeners.entrySet()) {
1656
1657 OFType type = entry.getKey();
1658 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1659 entry.getValue();
1660
1661 StringBuffer sb = new StringBuffer();
1662 sb.append("OFListeners for ");
1663 sb.append(type);
1664 sb.append(": ");
1665 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1666 sb.append(l.getName());
1667 sb.append(",");
1668 }
1669 log.debug(sb.toString());
1670 }
1671 }
1672
1673 public void removeOFMessageListeners(OFType type) {
1674 messageListeners.remove(type);
1675 }
1676
1677 @Override
1678 public Map<Long, IOFSwitch> getSwitches() {
1679 return Collections.unmodifiableMap(this.activeSwitches);
1680 }
1681
1682 @Override
1683 public void addOFSwitchListener(IOFSwitchListener listener) {
1684 this.switchListeners.add(listener);
1685 }
1686
1687 @Override
1688 public void removeOFSwitchListener(IOFSwitchListener listener) {
1689 this.switchListeners.remove(listener);
1690 }
1691
1692 @Override
1693 public Map<OFType, List<IOFMessageListener>> getListeners() {
1694 Map<OFType, List<IOFMessageListener>> lers =
1695 new HashMap<OFType, List<IOFMessageListener>>();
1696 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1697 messageListeners.entrySet()) {
1698 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1699 }
1700 return Collections.unmodifiableMap(lers);
1701 }
1702
1703 @Override
1704 @LogMessageDocs({
1705 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1706 "a null switch",
1707 explanation="Failed to process a message because the switch " +
1708 " is no longer connected."),
1709 @LogMessageDoc(level="ERROR",
1710 message="Error reinjecting OFMessage on switch {switch}",
1711 explanation="An I/O error occured while attempting to " +
1712 "process an OpenFlow message",
1713 recommendation=LogMessageDoc.CHECK_SWITCH)
1714 })
1715 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1716 FloodlightContext bc) {
1717 if (sw == null) {
1718 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1719 return false;
1720 }
1721
1722 // FIXME: Do we need to be able to inject messages to switches
1723 // where we're the slave controller (i.e. they're connected but
1724 // not active)?
1725 // FIXME: Don't we need synchronization logic here so we're holding
1726 // the listener read lock when we call handleMessage? After some
1727 // discussions it sounds like the right thing to do here would be to
1728 // inject the message as a netty upstream channel event so it goes
1729 // through the normal netty event processing, including being
1730 // handled
1731 if (!activeSwitches.containsKey(sw.getId())) return false;
1732
1733 try {
1734 // Pass Floodlight context to the handleMessages()
1735 handleMessage(sw, msg, bc);
1736 } catch (IOException e) {
1737 log.error("Error reinjecting OFMessage on switch {}",
1738 HexString.toHexString(sw.getId()));
1739 return false;
1740 }
1741 return true;
1742 }
1743
1744 @Override
1745 @LogMessageDoc(message="Calling System.exit",
1746 explanation="The controller is terminating")
1747 public synchronized void terminate() {
1748 log.info("Calling System.exit");
1749 System.exit(1);
1750 }
1751
1752 @Override
1753 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1754 // call the overloaded version with floodlight context set to null
1755 return injectOfMessage(sw, msg, null);
1756 }
1757
1758 @Override
1759 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1760 FloodlightContext bc) {
1761 if (log.isTraceEnabled()) {
1762 String str = OFMessage.getDataAsString(sw, m, bc);
1763 log.trace("{}", str);
1764 }
1765
1766 List<IOFMessageListener> listeners = null;
1767 if (messageListeners.containsKey(m.getType())) {
1768 listeners =
1769 messageListeners.get(m.getType()).getOrderedListeners();
1770 }
1771
1772 if (listeners != null) {
1773 for (IOFMessageListener listener : listeners) {
1774 if (listener instanceof IOFSwitchFilter) {
1775 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1776 continue;
1777 }
1778 }
1779 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1780 break;
1781 }
1782 }
1783 }
1784 }
1785
1786 @Override
1787 public BasicFactory getOFMessageFactory() {
1788 return factory;
1789 }
1790
1791 @Override
1792 public String getControllerId() {
1793 return controllerId;
1794 }
1795
1796 // **************
1797 // Initialization
1798 // **************
1799
1800 protected void updateAllInactiveSwitchInfo() {
1801 if (role == Role.SLAVE) {
1802 return;
1803 }
1804 String controllerId = getControllerId();
1805 String[] switchColumns = { SWITCH_DATAPATH_ID,
1806 SWITCH_CONTROLLER_ID,
1807 SWITCH_ACTIVE };
1808 String[] portColumns = { PORT_ID, PORT_SWITCH };
1809 IResultSet switchResultSet = null;
1810 try {
1811 OperatorPredicate op =
1812 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1813 OperatorPredicate.Operator.EQ,
1814 controllerId);
1815 switchResultSet =
1816 storageSource.executeQuery(SWITCH_TABLE_NAME,
1817 switchColumns,
1818 op, null);
1819 while (switchResultSet.next()) {
1820 IResultSet portResultSet = null;
1821 try {
1822 String datapathId =
1823 switchResultSet.getString(SWITCH_DATAPATH_ID);
1824 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1825 op = new OperatorPredicate(PORT_SWITCH,
1826 OperatorPredicate.Operator.EQ,
1827 datapathId);
1828 portResultSet =
1829 storageSource.executeQuery(PORT_TABLE_NAME,
1830 portColumns,
1831 op, null);
1832 while (portResultSet.next()) {
1833 portResultSet.deleteRow();
1834 }
1835 portResultSet.save();
1836 }
1837 finally {
1838 if (portResultSet != null)
1839 portResultSet.close();
1840 }
1841 }
1842 switchResultSet.save();
1843 }
1844 finally {
1845 if (switchResultSet != null)
1846 switchResultSet.close();
1847 }
1848 }
1849
1850 protected void updateControllerInfo() {
1851 updateAllInactiveSwitchInfo();
1852
1853 // Write out the controller info to the storage source
1854 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1855 String id = getControllerId();
1856 controllerInfo.put(CONTROLLER_ID, id);
1857 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1858 }
1859
1860 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1861 if (role == Role.SLAVE) {
1862 return;
1863 }
1864 // Obtain the row info for the switch
1865 Map<String, Object> switchInfo = new HashMap<String, Object>();
1866 String datapathIdString = sw.getStringId();
1867 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1868 String controllerId = getControllerId();
1869 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1870 Date connectedSince = sw.getConnectedSince();
1871 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1872 Channel channel = sw.getChannel();
1873 SocketAddress socketAddress = channel.getRemoteAddress();
1874 if (socketAddress != null) {
1875 String socketAddressString = socketAddress.toString();
1876 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1877 if (socketAddress instanceof InetSocketAddress) {
1878 InetSocketAddress inetSocketAddress =
1879 (InetSocketAddress)socketAddress;
1880 InetAddress inetAddress = inetSocketAddress.getAddress();
1881 String ip = inetAddress.getHostAddress();
1882 switchInfo.put(SWITCH_IP, ip);
1883 }
1884 }
1885
1886 // Write out the switch features info
1887 long capabilities = U32.f(sw.getCapabilities());
1888 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1889 long buffers = U32.f(sw.getBuffers());
1890 switchInfo.put(SWITCH_BUFFERS, buffers);
1891 long tables = U32.f(sw.getTables());
1892 switchInfo.put(SWITCH_TABLES, tables);
1893 long actions = U32.f(sw.getActions());
1894 switchInfo.put(SWITCH_ACTIONS, actions);
1895 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1896
1897 // Update the switch
1898 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1899
1900 // Update the ports
1901 for (OFPhysicalPort port: sw.getPorts()) {
1902 updatePortInfo(sw, port);
1903 }
1904 }
1905
1906 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1907 if (role == Role.SLAVE) {
1908 return;
1909 }
1910 log.debug("Update DB with inactiveSW {}", sw);
1911 // Update the controller info in the storage source to be inactive
1912 Map<String, Object> switchInfo = new HashMap<String, Object>();
1913 String datapathIdString = sw.getStringId();
1914 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1915 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1916 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1917 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1918 }
1919
1920 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1921 if (role == Role.SLAVE) {
1922 return;
1923 }
1924 String datapathIdString = sw.getStringId();
1925 Map<String, Object> portInfo = new HashMap<String, Object>();
1926 int portNumber = U16.f(port.getPortNumber());
1927 String id = datapathIdString + "|" + portNumber;
1928 portInfo.put(PORT_ID, id);
1929 portInfo.put(PORT_SWITCH, datapathIdString);
1930 portInfo.put(PORT_NUMBER, portNumber);
1931 byte[] hardwareAddress = port.getHardwareAddress();
1932 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1933 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1934 String name = port.getName();
1935 portInfo.put(PORT_NAME, name);
1936 long config = U32.f(port.getConfig());
1937 portInfo.put(PORT_CONFIG, config);
1938 long state = U32.f(port.getState());
1939 portInfo.put(PORT_STATE, state);
1940 long currentFeatures = U32.f(port.getCurrentFeatures());
1941 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1942 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1943 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1944 long supportedFeatures = U32.f(port.getSupportedFeatures());
1945 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1946 long peerFeatures = U32.f(port.getPeerFeatures());
1947 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1948 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1949 }
1950
1951 /**
1952 * Read switch port data from storage and write it into a switch object
1953 * @param sw the switch to update
1954 */
1955 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1956 OperatorPredicate op =
1957 new OperatorPredicate(PORT_SWITCH,
1958 OperatorPredicate.Operator.EQ,
1959 sw.getStringId());
1960 IResultSet portResultSet =
1961 storageSource.executeQuery(PORT_TABLE_NAME,
1962 null, op, null);
1963 //Map<Short, OFPhysicalPort> oldports =
1964 // new HashMap<Short, OFPhysicalPort>();
1965 //oldports.putAll(sw.getPorts());
1966
1967 while (portResultSet.next()) {
1968 try {
1969 OFPhysicalPort p = new OFPhysicalPort();
1970 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1971 p.setName(portResultSet.getString(PORT_NAME));
1972 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1973 p.setState((int)portResultSet.getLong(PORT_STATE));
1974 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1975 p.setHardwareAddress(HexString.fromHexString(portMac));
1976 p.setCurrentFeatures((int)portResultSet.
1977 getLong(PORT_CURRENT_FEATURES));
1978 p.setAdvertisedFeatures((int)portResultSet.
1979 getLong(PORT_ADVERTISED_FEATURES));
1980 p.setSupportedFeatures((int)portResultSet.
1981 getLong(PORT_SUPPORTED_FEATURES));
1982 p.setPeerFeatures((int)portResultSet.
1983 getLong(PORT_PEER_FEATURES));
1984 //oldports.remove(Short.valueOf(p.getPortNumber()));
1985 sw.setPort(p);
1986 } catch (NullPointerException e) {
1987 // ignore
1988 }
1989 }
1990 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1991 try {
1992 this.updates.put(update);
1993 } catch (InterruptedException e) {
1994 log.error("Failure adding update to queue", e);
1995 }
1996 }
1997
1998 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1999 if (role == Role.SLAVE) {
2000 return;
2001 }
2002 String datapathIdString = sw.getStringId();
2003 String id = datapathIdString + "|" + portNumber;
2004 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2005 }
2006
2007 /**
2008 * Sets the initial role based on properties in the config params.
2009 * It looks for two different properties.
2010 * If the "role" property is specified then the value should be
2011 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2012 * controller is set to the specified value. If the "role" property
2013 * is not specified then it looks next for the "role.path" property.
2014 * In this case the value should be the path to a property file in
2015 * the file system that contains a property called "floodlight.role"
2016 * which can be one of the values listed above for the "role" property.
2017 * The idea behind the "role.path" mechanism is that you have some
2018 * separate heartbeat and master controller election algorithm that
2019 * determines the role of the controller. When a role transition happens,
2020 * it updates the current role in the file specified by the "role.path"
2021 * file. Then if floodlight restarts for some reason it can get the
2022 * correct current role of the controller from the file.
2023 * @param configParams The config params for the FloodlightProvider service
2024 * @return A valid role if role information is specified in the
2025 * config params, otherwise null
2026 */
2027 @LogMessageDocs({
2028 @LogMessageDoc(message="Controller role set to {role}",
2029 explanation="Setting the initial HA role to "),
2030 @LogMessageDoc(level="ERROR",
2031 message="Invalid current role value: {role}",
2032 explanation="An invalid HA role value was read from the " +
2033 "properties file",
2034 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2035 })
2036 protected Role getInitialRole(Map<String, String> configParams) {
2037 Role role = null;
2038 String roleString = configParams.get("role");
2039 if (roleString == null) {
2040 String rolePath = configParams.get("rolepath");
2041 if (rolePath != null) {
2042 Properties properties = new Properties();
2043 try {
2044 properties.load(new FileInputStream(rolePath));
2045 roleString = properties.getProperty("floodlight.role");
2046 }
2047 catch (IOException exc) {
2048 // Don't treat it as an error if the file specified by the
2049 // rolepath property doesn't exist. This lets us enable the
2050 // HA mechanism by just creating/setting the floodlight.role
2051 // property in that file without having to modify the
2052 // floodlight properties.
2053 }
2054 }
2055 }
2056
2057 if (roleString != null) {
2058 // Canonicalize the string to the form used for the enum constants
2059 roleString = roleString.trim().toUpperCase();
2060 try {
2061 role = Role.valueOf(roleString);
2062 }
2063 catch (IllegalArgumentException exc) {
2064 log.error("Invalid current role value: {}", roleString);
2065 }
2066 }
2067
2068 log.info("Controller role set to {}", role);
2069
2070 return role;
2071 }
2072
2073 /**
2074 * Tell controller that we're ready to accept switches loop
2075 * @throws IOException
2076 */
2077 @LogMessageDocs({
2078 @LogMessageDoc(message="Listening for switch connections on {address}",
2079 explanation="The controller is ready and listening for new" +
2080 " switch connections"),
2081 @LogMessageDoc(message="Storage exception in controller " +
2082 "updates loop; terminating process",
2083 explanation=ERROR_DATABASE,
2084 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2085 @LogMessageDoc(level="ERROR",
2086 message="Exception in controller updates loop",
2087 explanation="Failed to dispatch controller event",
2088 recommendation=LogMessageDoc.GENERIC_ACTION)
2089 })
2090 public void run() {
2091 if (log.isDebugEnabled()) {
2092 logListeners();
2093 }
2094
2095 try {
2096 final ServerBootstrap bootstrap = createServerBootStrap();
2097
2098 bootstrap.setOption("reuseAddr", true);
2099 bootstrap.setOption("child.keepAlive", true);
2100 bootstrap.setOption("child.tcpNoDelay", true);
2101 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2102
2103 ChannelPipelineFactory pfact =
2104 new OpenflowPipelineFactory(this, null);
2105 bootstrap.setPipelineFactory(pfact);
2106 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2107 final ChannelGroup cg = new DefaultChannelGroup();
2108 cg.add(bootstrap.bind(sa));
2109
2110 log.info("Listening for switch connections on {}", sa);
2111 } catch (Exception e) {
2112 throw new RuntimeException(e);
2113 }
2114
2115 // main loop
2116 while (true) {
2117 try {
2118 IUpdate update = updates.take();
2119 update.dispatch();
2120 } catch (InterruptedException e) {
2121 return;
2122 } catch (StorageException e) {
2123 log.error("Storage exception in controller " +
2124 "updates loop; terminating process", e);
2125 return;
2126 } catch (Exception e) {
2127 log.error("Exception in controller updates loop", e);
2128 }
2129 }
2130 }
2131
2132 private ServerBootstrap createServerBootStrap() {
2133 if (workerThreads == 0) {
2134 return new ServerBootstrap(
2135 new NioServerSocketChannelFactory(
2136 Executors.newCachedThreadPool(),
2137 Executors.newCachedThreadPool()));
2138 } else {
2139 return new ServerBootstrap(
2140 new NioServerSocketChannelFactory(
2141 Executors.newCachedThreadPool(),
2142 Executors.newCachedThreadPool(), workerThreads));
2143 }
2144 }
2145
2146 public void setConfigParams(Map<String, String> configParams) {
2147 String ofPort = configParams.get("openflowport");
2148 if (ofPort != null) {
2149 this.openFlowPort = Integer.parseInt(ofPort);
2150 }
2151 log.debug("OpenFlow port set to {}", this.openFlowPort);
2152 String threads = configParams.get("workerthreads");
2153 if (threads != null) {
2154 this.workerThreads = Integer.parseInt(threads);
2155 }
2156 log.debug("Number of worker threads set to {}", this.workerThreads);
2157 String controllerId = configParams.get("controllerid");
2158 if (controllerId != null) {
2159 this.controllerId = controllerId;
2160 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002161 else {
2162 //Try to get the hostname of the machine and use that for controller ID
2163 try {
2164 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2165 this.controllerId = hostname;
2166 } catch (UnknownHostException e) {
2167 // Can't get hostname, we'll just use the default
2168 }
2169 }
2170
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002171 log.debug("ControllerId set to {}", this.controllerId);
2172 }
2173
2174 private void initVendorMessages() {
2175 // Configure openflowj to be able to parse the role request/reply
2176 // vendor messages.
2177 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2178 OFNiciraVendorData.NX_VENDOR_ID, 4);
2179 OFVendorId.registerVendorId(niciraVendorId);
2180 OFBasicVendorDataType roleRequestVendorData =
2181 new OFBasicVendorDataType(
2182 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2183 OFRoleRequestVendorData.getInstantiable());
2184 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2185 OFBasicVendorDataType roleReplyVendorData =
2186 new OFBasicVendorDataType(
2187 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2188 OFRoleReplyVendorData.getInstantiable());
2189 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2190 }
2191
2192 /**
2193 * Initialize internal data structures
2194 */
2195 public void init(Map<String, String> configParams) {
2196 // These data structures are initialized here because other
2197 // module's startUp() might be called before ours
2198 this.messageListeners =
2199 new ConcurrentHashMap<OFType,
2200 ListenerDispatcher<OFType,
2201 IOFMessageListener>>();
2202 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2203 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2204 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2205 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2206 this.controllerNodeIPsCache = new HashMap<String, String>();
2207 this.updates = new LinkedBlockingQueue<IUpdate>();
2208 this.factory = new BasicFactory();
2209 this.providerMap = new HashMap<String, List<IInfoProvider>>();
Pankaj Berde15193092013-03-21 17:30:14 -07002210
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002211 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002212 //this.role = getInitialRole(configParams);
2213 //Set the controller's role to MASTER so it always tries to do role requests.
2214 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002215 this.roleChanger = new RoleChanger();
Pankaj Berde15193092013-03-21 17:30:14 -07002216
2217 String conf = configParams.get("dbconf");
2218 if (conf == null) {
2219 conf = "/tmp/cassandra.titan";
2220 }
2221 this.swStore = new SwitchStorageImpl();
2222 this.swStore.init(conf);
2223
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002224 initVendorMessages();
2225 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002226 }
2227
2228 /**
2229 * Startup all of the controller's components
2230 */
2231 @LogMessageDoc(message="Waiting for storage source",
2232 explanation="The system database is not yet ready",
2233 recommendation="If this message persists, this indicates " +
2234 "that the system database has failed to start. " +
2235 LogMessageDoc.CHECK_CONTROLLER)
2236 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002237 try {
2238 registryService.registerController(controllerId);
2239 } catch (RegistryException e2) {
2240 log.warn("Registry service error: {}", e2.getMessage());
2241 }
2242
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002243 // Create the table names we use
2244 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2245 storageSource.createTable(SWITCH_TABLE_NAME, null);
2246 storageSource.createTable(PORT_TABLE_NAME, null);
2247 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2248 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2249 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2250 CONTROLLER_ID);
2251 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2252 SWITCH_DATAPATH_ID);
2253 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2254 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2255 CONTROLLER_INTERFACE_ID);
2256 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2257
2258 while (true) {
2259 try {
2260 updateControllerInfo();
2261 break;
2262 }
2263 catch (StorageException e) {
2264 log.info("Waiting for storage source");
2265 try {
2266 Thread.sleep(1000);
2267 } catch (InterruptedException e1) {
2268 }
2269 }
2270 }
2271
2272 // Add our REST API
2273 restApi.addRestletRoutable(new CoreWebRoutable());
2274 }
2275
2276 @Override
2277 public void addInfoProvider(String type, IInfoProvider provider) {
2278 if (!providerMap.containsKey(type)) {
2279 providerMap.put(type, new ArrayList<IInfoProvider>());
2280 }
2281 providerMap.get(type).add(provider);
2282 }
2283
2284 @Override
2285 public void removeInfoProvider(String type, IInfoProvider provider) {
2286 if (!providerMap.containsKey(type)) {
2287 log.debug("Provider type {} doesn't exist.", type);
2288 return;
2289 }
2290
2291 providerMap.get(type).remove(provider);
2292 }
2293
2294 public Map<String, Object> getControllerInfo(String type) {
2295 if (!providerMap.containsKey(type)) return null;
2296
2297 Map<String, Object> result = new LinkedHashMap<String, Object>();
2298 for (IInfoProvider provider : providerMap.get(type)) {
2299 result.putAll(provider.getInfo(type));
2300 }
2301
2302 return result;
2303 }
2304
2305 @Override
2306 public void addHAListener(IHAListener listener) {
2307 this.haListeners.add(listener);
2308 }
2309
2310 @Override
2311 public void removeHAListener(IHAListener listener) {
2312 this.haListeners.remove(listener);
2313 }
2314
2315
2316 /**
2317 * Handle changes to the controller nodes IPs and dispatch update.
2318 */
2319 @SuppressWarnings("unchecked")
2320 protected void handleControllerNodeIPChanges() {
2321 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2322 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2323 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2324 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2325 CONTROLLER_INTERFACE_TYPE,
2326 CONTROLLER_INTERFACE_NUMBER,
2327 CONTROLLER_INTERFACE_DISCOVERED_IP };
2328 synchronized(controllerNodeIPsCache) {
2329 // We currently assume that interface Ethernet0 is the relevant
2330 // controller interface. Might change.
2331 // We could (should?) implement this using
2332 // predicates, but creating the individual and compound predicate
2333 // seems more overhead then just checking every row. Particularly,
2334 // since the number of rows is small and changes infrequent
2335 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2336 colNames,null, null);
2337 while (res.next()) {
2338 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2339 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2340 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2341 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2342 String curIP = controllerNodeIPsCache.get(controllerID);
2343
2344 curControllerNodeIPs.put(controllerID, discoveredIP);
2345 if (curIP == null) {
2346 // new controller node IP
2347 addedControllerNodeIPs.put(controllerID, discoveredIP);
2348 }
2349 else if (curIP != discoveredIP) {
2350 // IP changed
2351 removedControllerNodeIPs.put(controllerID, curIP);
2352 addedControllerNodeIPs.put(controllerID, discoveredIP);
2353 }
2354 }
2355 }
2356 // Now figure out if rows have been deleted. We can't use the
2357 // rowKeys from rowsDeleted directly, since the tables primary
2358 // key is a compound that we can't disassemble
2359 Set<String> curEntries = curControllerNodeIPs.keySet();
2360 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2361 removedEntries.removeAll(curEntries);
2362 for (String removedControllerID : removedEntries)
2363 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2364 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2365 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2366 curControllerNodeIPs, addedControllerNodeIPs,
2367 removedControllerNodeIPs);
2368 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2369 try {
2370 this.updates.put(update);
2371 } catch (InterruptedException e) {
2372 log.error("Failure adding update to queue", e);
2373 }
2374 }
2375 }
2376 }
2377
2378 @Override
2379 public Map<String, String> getControllerNodeIPs() {
2380 // We return a copy of the mapping so we can guarantee that
2381 // the mapping return is the same as one that will be (or was)
2382 // dispatched to IHAListeners
2383 HashMap<String,String> retval = new HashMap<String,String>();
2384 synchronized(controllerNodeIPsCache) {
2385 retval.putAll(controllerNodeIPsCache);
2386 }
2387 return retval;
2388 }
2389
2390 @Override
2391 public void rowsModified(String tableName, Set<Object> rowKeys) {
2392 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2393 handleControllerNodeIPChanges();
2394 }
2395
2396 }
2397
2398 @Override
2399 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2400 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2401 handleControllerNodeIPChanges();
2402 }
2403 }
2404
2405 @Override
2406 public long getSystemStartTime() {
2407 return (this.systemStartTime);
2408 }
2409
2410 @Override
2411 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2412 this.alwaysClearFlowsOnSwAdd = value;
2413 }
2414
2415 public boolean getAlwaysClearFlowsOnSwAdd() {
2416 return this.alwaysClearFlowsOnSwAdd;
2417 }
2418}