blob: e8c575a1356b2aef233f1ae8c55b2a042bf9bcb2 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
61import net.floodlightcontroller.core.annotations.LogMessageDoc;
62import net.floodlightcontroller.core.annotations.LogMessageDocs;
63import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
64import net.floodlightcontroller.core.util.ListenerDispatcher;
65import net.floodlightcontroller.core.web.CoreWebRoutable;
66import net.floodlightcontroller.counter.ICounterStoreService;
67import net.floodlightcontroller.packet.Ethernet;
68import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
69import net.floodlightcontroller.restserver.IRestApiService;
70import net.floodlightcontroller.storage.IResultSet;
71import net.floodlightcontroller.storage.IStorageSourceListener;
72import net.floodlightcontroller.storage.IStorageSourceService;
73import net.floodlightcontroller.storage.OperatorPredicate;
74import net.floodlightcontroller.storage.StorageException;
75import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070076import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
HIGUCHI Yuta20514902013-06-12 11:24:16 -070077import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoRouteService;
HIGUCHI Yutaedf81d72013-06-12 12:06:57 -070078import net.onrc.onos.ofcontroller.flowcache.IFlowService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080079import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080080import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080081import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080082
83import org.jboss.netty.bootstrap.ServerBootstrap;
84import org.jboss.netty.buffer.ChannelBuffer;
85import org.jboss.netty.buffer.ChannelBuffers;
86import org.jboss.netty.channel.Channel;
87import org.jboss.netty.channel.ChannelHandlerContext;
88import org.jboss.netty.channel.ChannelPipelineFactory;
89import org.jboss.netty.channel.ChannelStateEvent;
90import org.jboss.netty.channel.ChannelUpstreamHandler;
91import org.jboss.netty.channel.Channels;
92import org.jboss.netty.channel.ExceptionEvent;
93import org.jboss.netty.channel.MessageEvent;
94import org.jboss.netty.channel.group.ChannelGroup;
95import org.jboss.netty.channel.group.DefaultChannelGroup;
96import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
97import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
98import org.jboss.netty.handler.timeout.IdleStateEvent;
99import org.jboss.netty.handler.timeout.ReadTimeoutException;
100import org.openflow.protocol.OFEchoReply;
101import org.openflow.protocol.OFError;
102import org.openflow.protocol.OFError.OFBadActionCode;
103import org.openflow.protocol.OFError.OFBadRequestCode;
104import org.openflow.protocol.OFError.OFErrorType;
105import org.openflow.protocol.OFError.OFFlowModFailedCode;
106import org.openflow.protocol.OFError.OFHelloFailedCode;
107import org.openflow.protocol.OFError.OFPortModFailedCode;
108import org.openflow.protocol.OFError.OFQueueOpFailedCode;
109import org.openflow.protocol.OFFeaturesReply;
110import org.openflow.protocol.OFGetConfigReply;
111import org.openflow.protocol.OFMessage;
112import org.openflow.protocol.OFPacketIn;
113import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800115import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800116import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800117import org.openflow.protocol.OFPortStatus.OFPortReason;
118import org.openflow.protocol.OFSetConfig;
119import org.openflow.protocol.OFStatisticsRequest;
120import org.openflow.protocol.OFSwitchConfig;
121import org.openflow.protocol.OFType;
122import org.openflow.protocol.OFVendor;
123import org.openflow.protocol.factory.BasicFactory;
124import org.openflow.protocol.factory.MessageParseException;
125import org.openflow.protocol.statistics.OFDescriptionStatistics;
126import org.openflow.protocol.statistics.OFStatistics;
127import org.openflow.protocol.statistics.OFStatisticsType;
128import org.openflow.protocol.vendor.OFBasicVendorDataType;
129import org.openflow.protocol.vendor.OFBasicVendorId;
130import org.openflow.protocol.vendor.OFVendorId;
131import org.openflow.util.HexString;
132import org.openflow.util.U16;
133import org.openflow.util.U32;
134import org.openflow.vendor.nicira.OFNiciraVendorData;
135import org.openflow.vendor.nicira.OFRoleReplyVendorData;
136import org.openflow.vendor.nicira.OFRoleRequestVendorData;
137import org.openflow.vendor.nicira.OFRoleVendorData;
138import org.slf4j.Logger;
139import org.slf4j.LoggerFactory;
140
141
142/**
143 * The main controller class. Handles all setup and network listeners
144 */
145public class Controller implements IFloodlightProviderService,
146 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700147
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800148 protected static Logger log = LoggerFactory.getLogger(Controller.class);
149
150 private static final String ERROR_DATABASE =
151 "The controller could not communicate with the system database.";
152
153 protected BasicFactory factory;
154 protected ConcurrentMap<OFType,
155 ListenerDispatcher<OFType,IOFMessageListener>>
156 messageListeners;
157 // The activeSwitches map contains only those switches that are actively
158 // being controlled by us -- it doesn't contain switches that are
159 // in the slave role
160 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
161 // connectedSwitches contains all connected switches, including ones where
162 // we're a slave controller. We need to keep track of them so that we can
163 // send role request messages to switches when our role changes to master
164 // We add a switch to this set after it successfully completes the
165 // handshake. Access to this Set needs to be synchronized with roleChanger
166 protected HashSet<OFSwitchImpl> connectedSwitches;
167
168 // The controllerNodeIPsCache maps Controller IDs to their IP address.
169 // It's only used by handleControllerNodeIPsChanged
170 protected HashMap<String, String> controllerNodeIPsCache;
171
172 protected Set<IOFSwitchListener> switchListeners;
173 protected Set<IHAListener> haListeners;
174 protected Map<String, List<IInfoProvider>> providerMap;
175 protected BlockingQueue<IUpdate> updates;
176
177 // Module dependencies
178 protected IRestApiService restApi;
179 protected ICounterStoreService counterStore = null;
180 protected IStorageSourceService storageSource;
181 protected IPktInProcessingTimeService pktinProcTime;
182 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800183 protected IFlowService flowService;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800184 protected ITopoRouteService topoRouteService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800185 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800186
187 // Configuration options
188 protected int openFlowPort = 6633;
189 protected int workerThreads = 0;
190 // The id for this controller node. Should be unique for each controller
191 // node in a controller cluster.
192 protected String controllerId = "localhost";
193
194 // The current role of the controller.
195 // If the controller isn't configured to support roles, then this is null.
196 protected Role role;
197 // A helper that handles sending and timeout handling for role requests
198 protected RoleChanger roleChanger;
199
200 // Start time of the controller
201 protected long systemStartTime;
202
203 // Flag to always flush flow table on switch reconnect (HA or otherwise)
204 protected boolean alwaysClearFlowsOnSwAdd = false;
205
206 // Storage table names
207 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
208 protected static final String CONTROLLER_ID = "id";
209
210 protected static final String SWITCH_TABLE_NAME = "controller_switch";
211 protected static final String SWITCH_DATAPATH_ID = "dpid";
212 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
213 protected static final String SWITCH_IP = "ip";
214 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
215 protected static final String SWITCH_ACTIVE = "active";
216 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
217 protected static final String SWITCH_CAPABILITIES = "capabilities";
218 protected static final String SWITCH_BUFFERS = "buffers";
219 protected static final String SWITCH_TABLES = "tables";
220 protected static final String SWITCH_ACTIONS = "actions";
221
222 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
223 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
224
225 protected static final String PORT_TABLE_NAME = "controller_port";
226 protected static final String PORT_ID = "id";
227 protected static final String PORT_SWITCH = "switch_id";
228 protected static final String PORT_NUMBER = "number";
229 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
230 protected static final String PORT_NAME = "name";
231 protected static final String PORT_CONFIG = "config";
232 protected static final String PORT_STATE = "state";
233 protected static final String PORT_CURRENT_FEATURES = "current_features";
234 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
235 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
236 protected static final String PORT_PEER_FEATURES = "peer_features";
237
238 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
239 protected static final String CONTROLLER_INTERFACE_ID = "id";
240 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
241 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
242 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
243 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
244
245
246
247 // Perf. related configuration
248 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
249 protected static final int BATCH_MAX_SIZE = 100;
250 protected static final boolean ALWAYS_DECODE_ETH = true;
251
252 /**
253 * Updates handled by the main loop
254 */
255 protected interface IUpdate {
256 /**
257 * Calls the appropriate listeners
258 */
259 public void dispatch();
260 }
261 public enum SwitchUpdateType {
262 ADDED,
263 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700264 PORTCHANGED,
265 PORTADDED,
266 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800267 }
268 /**
269 * Update message indicating a switch was added or removed
270 */
271 protected class SwitchUpdate implements IUpdate {
272 public IOFSwitch sw;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700273 public OFPhysicalPort port;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800274 public SwitchUpdateType switchUpdateType;
275 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
276 this.sw = sw;
277 this.switchUpdateType = switchUpdateType;
278 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700279 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
280 this.sw = sw;
281 this.port = port;
282 this.switchUpdateType = switchUpdateType;
283 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800284 public void dispatch() {
285 if (log.isTraceEnabled()) {
286 log.trace("Dispatching switch update {} {}",
287 sw, switchUpdateType);
288 }
289 if (switchListeners != null) {
290 for (IOFSwitchListener listener : switchListeners) {
291 switch(switchUpdateType) {
292 case ADDED:
293 listener.addedSwitch(sw);
294 break;
295 case REMOVED:
296 listener.removedSwitch(sw);
297 break;
298 case PORTCHANGED:
299 listener.switchPortChanged(sw.getId());
300 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700301 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700302 if (listener instanceof IOFSwitchPortListener) {
303 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
304 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700305 break;
306 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700307 if (listener instanceof IOFSwitchPortListener) {
308 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
309 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700310 break;
311 default:
312 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800313 }
314 }
315 }
316 }
317 }
318
319 /**
320 * Update message indicating controller's role has changed
321 */
322 protected class HARoleUpdate implements IUpdate {
323 public Role oldRole;
324 public Role newRole;
325 public HARoleUpdate(Role newRole, Role oldRole) {
326 this.oldRole = oldRole;
327 this.newRole = newRole;
328 }
329 public void dispatch() {
330 // Make sure that old and new roles are different.
331 if (oldRole == newRole) {
332 if (log.isTraceEnabled()) {
333 log.trace("HA role update ignored as the old and " +
334 "new roles are the same. newRole = {}" +
335 "oldRole = {}", newRole, oldRole);
336 }
337 return;
338 }
339 if (log.isTraceEnabled()) {
340 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
341 newRole, oldRole);
342 }
343 if (haListeners != null) {
344 for (IHAListener listener : haListeners) {
345 listener.roleChanged(oldRole, newRole);
346 }
347 }
348 }
349 }
350
351 /**
352 * Update message indicating
353 * IPs of controllers in controller cluster have changed.
354 */
355 protected class HAControllerNodeIPUpdate implements IUpdate {
356 public Map<String,String> curControllerNodeIPs;
357 public Map<String,String> addedControllerNodeIPs;
358 public Map<String,String> removedControllerNodeIPs;
359 public HAControllerNodeIPUpdate(
360 HashMap<String,String> curControllerNodeIPs,
361 HashMap<String,String> addedControllerNodeIPs,
362 HashMap<String,String> removedControllerNodeIPs) {
363 this.curControllerNodeIPs = curControllerNodeIPs;
364 this.addedControllerNodeIPs = addedControllerNodeIPs;
365 this.removedControllerNodeIPs = removedControllerNodeIPs;
366 }
367 public void dispatch() {
368 if (log.isTraceEnabled()) {
369 log.trace("Dispatching HA Controller Node IP update "
370 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
371 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
372 removedControllerNodeIPs }
373 );
374 }
375 if (haListeners != null) {
376 for (IHAListener listener: haListeners) {
377 listener.controllerNodeIPsChanged(curControllerNodeIPs,
378 addedControllerNodeIPs, removedControllerNodeIPs);
379 }
380 }
381 }
382 }
383
384 // ***************
385 // Getters/Setters
386 // ***************
387
388 public void setStorageSourceService(IStorageSourceService storageSource) {
389 this.storageSource = storageSource;
390 }
391
392 public void setCounterStore(ICounterStoreService counterStore) {
393 this.counterStore = counterStore;
394 }
395
396 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
397 this.pktinProcTime = pits;
398 }
399
400 public void setRestApiService(IRestApiService restApi) {
401 this.restApi = restApi;
402 }
403
404 public void setThreadPoolService(IThreadPoolService tp) {
405 this.threadPool = tp;
406 }
407
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800408 public void setFlowService(IFlowService serviceImpl) {
409 this.flowService = serviceImpl;
410 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800411
412 public void setTopoRouteService(ITopoRouteService serviceImpl) {
413 this.topoRouteService = serviceImpl;
414 }
Jonathan Hartc2e95ee2013-02-22 15:25:11 -0800415
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800416 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800417 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800418 }
419
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800420 @Override
421 public Role getRole() {
422 synchronized(roleChanger) {
423 return role;
424 }
425 }
426
427 @Override
428 public void setRole(Role role) {
429 if (role == null) throw new NullPointerException("Role can not be null.");
430 if (role == Role.MASTER && this.role == Role.SLAVE) {
431 // Reset db state to Inactive for all switches.
432 updateAllInactiveSwitchInfo();
433 }
434
435 // Need to synchronize to ensure a reliable ordering on role request
436 // messages send and to ensure the list of connected switches is stable
437 // RoleChanger will handle the actual sending of the message and
438 // timeout handling
439 // @see RoleChanger
440 synchronized(roleChanger) {
441 if (role.equals(this.role)) {
442 log.debug("Ignoring role change: role is already {}", role);
443 return;
444 }
445
446 Role oldRole = this.role;
447 this.role = role;
448
449 log.debug("Submitting role change request to role {}", role);
450 roleChanger.submitRequest(connectedSwitches, role);
451
452 // Enqueue an update for our listeners.
453 try {
454 this.updates.put(new HARoleUpdate(role, oldRole));
455 } catch (InterruptedException e) {
456 log.error("Failure adding update to queue", e);
457 }
458 }
459 }
460
461
462
463 // **********************
464 // ChannelUpstreamHandler
465 // **********************
466
467 /**
468 * Return a new channel handler for processing a switch connections
469 * @param state The channel state object for the connection
470 * @return the new channel handler
471 */
472 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
473 return new OFChannelHandler(state);
474 }
475
Jonathan Hartcc957a02013-02-26 10:39:04 -0800476 protected class RoleChangeCallback implements ControlChangeCallback {
477 @Override
478 public void controlChanged(long dpid, boolean hasControl) {
479 log.info("Role change callback for switch {}, hasControl {}",
480 HexString.toHexString(dpid), hasControl);
481
482 synchronized(roleChanger){
483 OFSwitchImpl sw = null;
484 for (OFSwitchImpl connectedSw : connectedSwitches){
485 if (connectedSw.getId() == dpid){
486 sw = connectedSw;
487 break;
488 }
489 }
490 if (sw == null){
491 log.warn("Switch {} not found in connected switches",
492 HexString.toHexString(dpid));
493 return;
494 }
495
496 Role role = null;
497
Pankaj Berde01939e92013-03-08 14:38:27 -0800498 /*
499 * issue #229
500 * Cannot rely on sw.getRole() as it can be behind due to pending
501 * role changes in the queue. Just submit it and late the RoleChanger
502 * handle duplicates.
503 */
504
505 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800506 role = Role.MASTER;
507 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800508 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800509 role = Role.SLAVE;
510 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800511
512 log.debug("Sending role request {} msg to {}", role, sw);
513 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
514 swList.add(sw);
515 roleChanger.submitRequest(swList, role);
516
Jonathan Hartcc957a02013-02-26 10:39:04 -0800517 }
518
519 }
520 }
521
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800522 /**
523 * Channel handler deals with the switch connection and dispatches
524 * switch messages to the appropriate locations.
525 * @author readams
526 */
527 protected class OFChannelHandler
528 extends IdleStateAwareChannelUpstreamHandler {
529 protected OFSwitchImpl sw;
530 protected OFChannelState state;
531
532 public OFChannelHandler(OFChannelState state) {
533 this.state = state;
534 }
535
536 @Override
537 @LogMessageDoc(message="New switch connection from {ip address}",
538 explanation="A new switch has connected from the " +
539 "specified IP address")
540 public void channelConnected(ChannelHandlerContext ctx,
541 ChannelStateEvent e) throws Exception {
542 log.info("New switch connection from {}",
543 e.getChannel().getRemoteAddress());
544
545 sw = new OFSwitchImpl();
546 sw.setChannel(e.getChannel());
547 sw.setFloodlightProvider(Controller.this);
548 sw.setThreadPoolService(threadPool);
549
550 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
551 msglist.add(factory.getMessage(OFType.HELLO));
552 e.getChannel().write(msglist);
553
554 }
555
556 @Override
557 @LogMessageDoc(message="Disconnected switch {switch information}",
558 explanation="The specified switch has disconnected.")
559 public void channelDisconnected(ChannelHandlerContext ctx,
560 ChannelStateEvent e) throws Exception {
561 if (sw != null && state.hsState == HandshakeState.READY) {
562 if (activeSwitches.containsKey(sw.getId())) {
563 // It's safe to call removeSwitch even though the map might
564 // not contain this particular switch but another with the
565 // same DPID
566 removeSwitch(sw);
567 }
568 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700569 if (controlRequested) {
570 registryService.releaseControl(sw.getId());
571 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800572 connectedSwitches.remove(sw);
573 }
574 sw.setConnected(false);
575 }
576 log.info("Disconnected switch {}", sw);
577 }
578
579 @Override
580 @LogMessageDocs({
581 @LogMessageDoc(level="ERROR",
582 message="Disconnecting switch {switch} due to read timeout",
583 explanation="The connected switch has failed to send any " +
584 "messages or respond to echo requests",
585 recommendation=LogMessageDoc.CHECK_SWITCH),
586 @LogMessageDoc(level="ERROR",
587 message="Disconnecting switch {switch}: failed to " +
588 "complete handshake",
589 explanation="The switch did not respond correctly " +
590 "to handshake messages",
591 recommendation=LogMessageDoc.CHECK_SWITCH),
592 @LogMessageDoc(level="ERROR",
593 message="Disconnecting switch {switch} due to IO Error: {}",
594 explanation="There was an error communicating with the switch",
595 recommendation=LogMessageDoc.CHECK_SWITCH),
596 @LogMessageDoc(level="ERROR",
597 message="Disconnecting switch {switch} due to switch " +
598 "state error: {error}",
599 explanation="The switch sent an unexpected message",
600 recommendation=LogMessageDoc.CHECK_SWITCH),
601 @LogMessageDoc(level="ERROR",
602 message="Disconnecting switch {switch} due to " +
603 "message parse failure",
604 explanation="Could not parse a message from the switch",
605 recommendation=LogMessageDoc.CHECK_SWITCH),
606 @LogMessageDoc(level="ERROR",
607 message="Terminating controller due to storage exception",
608 explanation=ERROR_DATABASE,
609 recommendation=LogMessageDoc.CHECK_CONTROLLER),
610 @LogMessageDoc(level="ERROR",
611 message="Could not process message: queue full",
612 explanation="OpenFlow messages are arriving faster than " +
613 " the controller can process them.",
614 recommendation=LogMessageDoc.CHECK_CONTROLLER),
615 @LogMessageDoc(level="ERROR",
616 message="Error while processing message " +
617 "from switch {switch} {cause}",
618 explanation="An error occurred processing the switch message",
619 recommendation=LogMessageDoc.GENERIC_ACTION)
620 })
621 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
622 throws Exception {
623 if (e.getCause() instanceof ReadTimeoutException) {
624 // switch timeout
625 log.error("Disconnecting switch {} due to read timeout", sw);
626 ctx.getChannel().close();
627 } else if (e.getCause() instanceof HandshakeTimeoutException) {
628 log.error("Disconnecting switch {}: failed to complete handshake",
629 sw);
630 ctx.getChannel().close();
631 } else if (e.getCause() instanceof ClosedChannelException) {
632 //log.warn("Channel for sw {} already closed", sw);
633 } else if (e.getCause() instanceof IOException) {
634 log.error("Disconnecting switch {} due to IO Error: {}",
635 sw, e.getCause().getMessage());
636 ctx.getChannel().close();
637 } else if (e.getCause() instanceof SwitchStateException) {
638 log.error("Disconnecting switch {} due to switch state error: {}",
639 sw, e.getCause().getMessage());
640 ctx.getChannel().close();
641 } else if (e.getCause() instanceof MessageParseException) {
642 log.error("Disconnecting switch " + sw +
643 " due to message parse failure",
644 e.getCause());
645 ctx.getChannel().close();
646 } else if (e.getCause() instanceof StorageException) {
647 log.error("Terminating controller due to storage exception",
648 e.getCause());
649 terminate();
650 } else if (e.getCause() instanceof RejectedExecutionException) {
651 log.warn("Could not process message: queue full");
652 } else {
653 log.error("Error while processing message from switch " + sw,
654 e.getCause());
655 ctx.getChannel().close();
656 }
657 }
658
659 @Override
660 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
661 throws Exception {
662 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
663 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
664 e.getChannel().write(msglist);
665 }
666
667 @Override
668 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
669 throws Exception {
670 if (e.getMessage() instanceof List) {
671 @SuppressWarnings("unchecked")
672 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
673
674 for (OFMessage ofm : msglist) {
675 try {
676 processOFMessage(ofm);
677 }
678 catch (Exception ex) {
679 // We are the last handler in the stream, so run the
680 // exception through the channel again by passing in
681 // ctx.getChannel().
682 Channels.fireExceptionCaught(ctx.getChannel(), ex);
683 }
684 }
685
686 // Flush all flow-mods/packet-out generated from this "train"
687 OFSwitchImpl.flush_all();
688 }
689 }
690
691 /**
692 * Process the request for the switch description
693 */
694 @LogMessageDoc(level="ERROR",
695 message="Exception in reading description " +
696 " during handshake {exception}",
697 explanation="Could not process the switch description string",
698 recommendation=LogMessageDoc.CHECK_SWITCH)
699 void processSwitchDescReply() {
700 try {
701 // Read description, if it has been updated
702 @SuppressWarnings("unchecked")
703 Future<List<OFStatistics>> desc_future =
704 (Future<List<OFStatistics>>)sw.
705 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
706 List<OFStatistics> values =
707 desc_future.get(0, TimeUnit.MILLISECONDS);
708 if (values != null) {
709 OFDescriptionStatistics description =
710 new OFDescriptionStatistics();
711 ChannelBuffer data =
712 ChannelBuffers.buffer(description.getLength());
713 for (OFStatistics f : values) {
714 f.writeTo(data);
715 description.readFrom(data);
716 break; // SHOULD be a list of length 1
717 }
718 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
719 description);
720 sw.setSwitchProperties(description);
721 data = null;
722
723 // At this time, also set other switch properties from storage
724 boolean is_core_switch = false;
725 IResultSet resultSet = null;
726 try {
727 String swid = sw.getStringId();
728 resultSet =
729 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
730 for (Iterator<IResultSet> it =
731 resultSet.iterator(); it.hasNext();) {
732 // In case of multiple rows, use the status
733 // in last row?
734 Map<String, Object> row = it.next().getRow();
735 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
736 if (log.isDebugEnabled()) {
737 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
738 "config for switch={}, is-core={}",
739 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
740 }
741 String ics =
742 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
743 is_core_switch = ics.equals("true");
744 }
745 }
746 }
747 finally {
748 if (resultSet != null)
749 resultSet.close();
750 }
751 if (is_core_switch) {
752 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
753 new Boolean(true));
754 }
755 }
756 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
757 state.hasDescription = true;
758 checkSwitchReady();
759 }
760 catch (InterruptedException ex) {
761 // Ignore
762 }
763 catch (TimeoutException ex) {
764 // Ignore
765 } catch (Exception ex) {
766 log.error("Exception in reading description " +
767 " during handshake", ex);
768 }
769 }
770
771 /**
772 * Send initial switch setup information that we need before adding
773 * the switch
774 * @throws IOException
775 */
776 void sendHelloConfiguration() throws IOException {
777 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700778 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800779 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
780 }
781
782 /**
783 * Send the configuration requests we can only do after we have
784 * the features reply
785 * @throws IOException
786 */
787 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700788 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800789 // Ensure we receive the full packet via PacketIn
790 OFSetConfig config = (OFSetConfig) factory
791 .getMessage(OFType.SET_CONFIG);
792 config.setMissSendLength((short) 0xffff)
793 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
794 sw.write(config, null);
795 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
796 null);
797
798 // Get Description to set switch-specific flags
799 OFStatisticsRequest req = new OFStatisticsRequest();
800 req.setStatisticType(OFStatisticsType.DESC);
801 req.setLengthU(req.getLengthU());
802 Future<List<OFStatistics>> dfuture =
803 sw.getStatistics(req);
804 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
805 dfuture);
806
807 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700808
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700809 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800810 protected void checkSwitchReady() {
811 if (state.hsState == HandshakeState.FEATURES_REPLY &&
812 state.hasDescription && state.hasGetConfigReply) {
813
814 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700815 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800816
817 synchronized(roleChanger) {
818 // We need to keep track of all of the switches that are connected
819 // to the controller, in any role, so that we can later send the
820 // role request messages when the controller role changes.
821 // We need to be synchronized while doing this: we must not
822 // send a another role request to the connectedSwitches until
823 // we were able to add this new switch to connectedSwitches
824 // *and* send the current role to the new switch.
825 connectedSwitches.add(sw);
826
827 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800828 //Put the switch in SLAVE mode until we know we have control
829 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
830 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
831 swList.add(sw);
832 roleChanger.submitRequest(swList, Role.SLAVE);
833
Jonathan Hartcc957a02013-02-26 10:39:04 -0800834 //Request control of the switch from the global registry
835 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700836 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800837 registryService.requestControl(sw.getId(),
838 new RoleChangeCallback());
839 } catch (RegistryException e) {
840 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700841 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800842 }
843
Jonathan Hart97801ac2013-02-26 14:29:16 -0800844
Jonathan Hartcc957a02013-02-26 10:39:04 -0800845
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800846 // Send a role request if role support is enabled for the controller
847 // This is a probe that we'll use to determine if the switch
848 // actually supports the role request message. If it does we'll
849 // get back a role reply message. If it doesn't we'll get back an
850 // OFError message.
851 // If role is MASTER we will promote switch to active
852 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800853 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800854 log.debug("This controller's role is {}, " +
855 "sending initial role request msg to {}",
856 role, sw);
857 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
858 swList.add(sw);
859 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800860 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800861 }
862 else {
863 // Role supported not enabled on controller (for now)
864 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800865 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800866 "not sending role request msg to {}",
867 role, sw);
868 // Need to clear FlowMods before we add the switch
869 // and dispatch updates otherwise we have a race condition.
870 sw.clearAllFlowMods();
871 addSwitch(sw);
872 state.firstRoleReplyReceived = true;
873 }
874 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700875 if (!controlRequested) {
876 // yield to allow other thread(s) to release control
877 try {
878 Thread.sleep(10);
879 } catch (InterruptedException e) {
880 // Ignore interruptions
881 }
882 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700883 log.debug("Closing {} because we weren't able to request control " +
884 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700885 sw.channel.close();
886 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800887 }
888 }
889
890 /* Handle a role reply message we received from the switch. Since
891 * netty serializes message dispatch we don't need to synchronize
892 * against other receive operations from the same switch, so no need
893 * to synchronize addSwitch(), removeSwitch() operations from the same
894 * connection.
895 * FIXME: However, when a switch with the same DPID connects we do
896 * need some synchronization. However, handling switches with same
897 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
898 * removedSwitch notification):1
899 *
900 */
901 @LogMessageDoc(level="ERROR",
902 message="Invalid role value in role reply message",
903 explanation="Was unable to set the HA role (master or slave) " +
904 "for the controller.",
905 recommendation=LogMessageDoc.CHECK_CONTROLLER)
906 protected void handleRoleReplyMessage(OFVendor vendorMessage,
907 OFRoleReplyVendorData roleReplyVendorData) {
908 // Map from the role code in the message to our role enum
909 int nxRole = roleReplyVendorData.getRole();
910 Role role = null;
911 switch (nxRole) {
912 case OFRoleVendorData.NX_ROLE_OTHER:
913 role = Role.EQUAL;
914 break;
915 case OFRoleVendorData.NX_ROLE_MASTER:
916 role = Role.MASTER;
917 break;
918 case OFRoleVendorData.NX_ROLE_SLAVE:
919 role = Role.SLAVE;
920 break;
921 default:
922 log.error("Invalid role value in role reply message");
923 sw.getChannel().close();
924 return;
925 }
926
927 log.debug("Handling role reply for role {} from {}. " +
928 "Controller's role is {} ",
929 new Object[] { role, sw, Controller.this.role}
930 );
931
932 sw.deliverRoleReply(vendorMessage.getXid(), role);
933
934 boolean isActive = activeSwitches.containsKey(sw.getId());
935 if (!isActive && sw.isActive()) {
936 // Transition from SLAVE to MASTER.
937
938 if (!state.firstRoleReplyReceived ||
939 getAlwaysClearFlowsOnSwAdd()) {
940 // This is the first role-reply message we receive from
941 // this switch or roles were disabled when the switch
942 // connected:
943 // Delete all pre-existing flows for new connections to
944 // the master
945 //
946 // FIXME: Need to think more about what the test should
947 // be for when we flush the flow-table? For example,
948 // if all the controllers are temporarily in the backup
949 // role (e.g. right after a failure of the master
950 // controller) at the point the switch connects, then
951 // all of the controllers will initially connect as
952 // backup controllers and not flush the flow-table.
953 // Then when one of them is promoted to master following
954 // the master controller election the flow-table
955 // will still not be flushed because that's treated as
956 // a failover event where we don't want to flush the
957 // flow-table. The end result would be that the flow
958 // table for a newly connected switch is never
959 // flushed. Not sure how to handle that case though...
960 sw.clearAllFlowMods();
961 log.debug("First role reply from master switch {}, " +
962 "clear FlowTable to active switch list",
963 HexString.toHexString(sw.getId()));
964 }
965
966 // Some switches don't seem to update us with port
967 // status messages while in slave role.
968 readSwitchPortStateFromStorage(sw);
969
970 // Only add the switch to the active switch list if
971 // we're not in the slave role. Note that if the role
972 // attribute is null, then that means that the switch
973 // doesn't support the role request messages, so in that
974 // case we're effectively in the EQUAL role and the
975 // switch should be included in the active switch list.
976 addSwitch(sw);
977 log.debug("Added master switch {} to active switch list",
978 HexString.toHexString(sw.getId()));
979
980 }
981 else if (isActive && !sw.isActive()) {
982 // Transition from MASTER to SLAVE: remove switch
983 // from active switch list.
984 log.debug("Removed slave switch {} from active switch" +
985 " list", HexString.toHexString(sw.getId()));
986 removeSwitch(sw);
987 }
988
989 // Indicate that we have received a role reply message.
990 state.firstRoleReplyReceived = true;
991 }
992
993 protected boolean handleVendorMessage(OFVendor vendorMessage) {
994 boolean shouldHandleMessage = false;
995 int vendor = vendorMessage.getVendor();
996 switch (vendor) {
997 case OFNiciraVendorData.NX_VENDOR_ID:
998 OFNiciraVendorData niciraVendorData =
999 (OFNiciraVendorData)vendorMessage.getVendorData();
1000 int dataType = niciraVendorData.getDataType();
1001 switch (dataType) {
1002 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
1003 OFRoleReplyVendorData roleReplyVendorData =
1004 (OFRoleReplyVendorData) niciraVendorData;
1005 handleRoleReplyMessage(vendorMessage,
1006 roleReplyVendorData);
1007 break;
1008 default:
1009 log.warn("Unhandled Nicira VENDOR message; " +
1010 "data type = {}", dataType);
1011 break;
1012 }
1013 break;
1014 default:
1015 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1016 break;
1017 }
1018
1019 return shouldHandleMessage;
1020 }
1021
1022 /**
1023 * Dispatch an Openflow message from a switch to the appropriate
1024 * handler.
1025 * @param m The message to process
1026 * @throws IOException
1027 * @throws SwitchStateException
1028 */
1029 @LogMessageDocs({
1030 @LogMessageDoc(level="WARN",
1031 message="Config Reply from {switch} has " +
1032 "miss length set to {length}",
1033 explanation="The controller requires that the switch " +
1034 "use a miss length of 0xffff for correct " +
1035 "function",
1036 recommendation="Use a different switch to ensure " +
1037 "correct function"),
1038 @LogMessageDoc(level="WARN",
1039 message="Received ERROR from sw {switch} that "
1040 +"indicates roles are not supported "
1041 +"but we have received a valid "
1042 +"role reply earlier",
1043 explanation="The switch sent a confusing message to the" +
1044 "controller")
1045 })
1046 protected void processOFMessage(OFMessage m)
1047 throws IOException, SwitchStateException {
1048 boolean shouldHandleMessage = false;
1049
1050 switch (m.getType()) {
1051 case HELLO:
1052 if (log.isTraceEnabled())
1053 log.trace("HELLO from {}", sw);
1054
1055 if (state.hsState.equals(HandshakeState.START)) {
1056 state.hsState = HandshakeState.HELLO;
1057 sendHelloConfiguration();
1058 } else {
1059 throw new SwitchStateException("Unexpected HELLO from "
1060 + sw);
1061 }
1062 break;
1063 case ECHO_REQUEST:
1064 OFEchoReply reply =
1065 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1066 reply.setXid(m.getXid());
1067 sw.write(reply, null);
1068 break;
1069 case ECHO_REPLY:
1070 break;
1071 case FEATURES_REPLY:
1072 if (log.isTraceEnabled())
1073 log.trace("Features Reply from {}", sw);
1074
1075 sw.setFeaturesReply((OFFeaturesReply) m);
1076 if (state.hsState.equals(HandshakeState.HELLO)) {
1077 sendFeatureReplyConfiguration();
1078 state.hsState = HandshakeState.FEATURES_REPLY;
1079 // uncomment to enable "dumb" switches like cbench
1080 // state.hsState = HandshakeState.READY;
1081 // addSwitch(sw);
1082 } else {
1083 // return results to rest api caller
1084 sw.deliverOFFeaturesReply(m);
1085 // update database */
1086 updateActiveSwitchInfo(sw);
1087 }
1088 break;
1089 case GET_CONFIG_REPLY:
1090 if (log.isTraceEnabled())
1091 log.trace("Get config reply from {}", sw);
1092
1093 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1094 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1095 throw new SwitchStateException(em);
1096 }
1097 OFGetConfigReply cr = (OFGetConfigReply) m;
1098 if (cr.getMissSendLength() == (short)0xffff) {
1099 log.trace("Config Reply from {} confirms " +
1100 "miss length set to 0xffff", sw);
1101 } else {
1102 log.warn("Config Reply from {} has " +
1103 "miss length set to {}",
1104 sw, cr.getMissSendLength() & 0xffff);
1105 }
1106 state.hasGetConfigReply = true;
1107 checkSwitchReady();
1108 break;
1109 case VENDOR:
1110 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1111 break;
1112 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001113 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001114 // TODO: we need better error handling. Especially for
1115 // request/reply style message (stats, roles) we should have
1116 // a unified way to lookup the xid in the error message.
1117 // This will probable involve rewriting the way we handle
1118 // request/reply style messages.
1119 OFError error = (OFError) m;
1120 boolean shouldLogError = true;
1121 // TODO: should we check that firstRoleReplyReceived is false,
1122 // i.e., check only whether the first request fails?
1123 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1124 boolean isBadVendorError =
1125 (error.getErrorType() == OFError.OFErrorType.
1126 OFPET_BAD_REQUEST.getValue());
1127 // We expect to receive a bad vendor error when
1128 // we're connected to a switch that doesn't support
1129 // the Nicira vendor extensions (i.e. not OVS or
1130 // derived from OVS). By protocol, it should also be
1131 // BAD_VENDOR, but too many switch implementations
1132 // get it wrong and we can already check the xid()
1133 // so we can ignore the type with confidence that this
1134 // is not a spurious error
1135 shouldLogError = !isBadVendorError;
1136 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001137 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001138 if (state.firstRoleReplyReceived && (role != null)) {
1139 log.warn("Received ERROR from sw {} that "
1140 +"indicates roles are not supported "
1141 +"but we have received a valid "
1142 +"role reply earlier", sw);
1143 }
1144 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001145 Role requestedRole =
1146 sw.deliverRoleRequestNotSupported(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001147 synchronized(roleChanger) {
1148 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001149 //This will now never happen. The Controller's role
1150 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001151 // the switch doesn't understand role request
1152 // messages and the current controller role is
1153 // slave. We need to disconnect the switch.
1154 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001155 log.warn("Closing {} channel because controller's role " +
1156 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001157 sw.getChannel().close();
1158 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001159 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001160 log.debug("Adding switch {} because we got an error" +
1161 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001162 // Controller's role is master: add to
1163 // active
1164 // TODO: check if clearing flow table is
1165 // right choice here.
1166 // Need to clear FlowMods before we add the switch
1167 // and dispatch updates otherwise we have a race condition.
1168 // TODO: switch update is async. Won't we still have a potential
1169 // race condition?
1170 sw.clearAllFlowMods();
1171 addSwitch(sw);
1172 }
1173 }
1174 }
1175 else {
1176 // TODO: Is this the right thing to do if we receive
1177 // some other error besides a bad vendor error?
1178 // Presumably that means the switch did actually
1179 // understand the role request message, but there
1180 // was some other error from processing the message.
1181 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1182 // error code, but it doesn't look like the Nicira
1183 // role request has that. Should check OVS source
1184 // code to see if it's possible for any other errors
1185 // to be returned.
1186 // If we received an error the switch is not
1187 // in the correct role, so we need to disconnect it.
1188 // We could also resend the request but then we need to
1189 // check if there are other pending request in which
1190 // case we shouldn't resend. If we do resend we need
1191 // to make sure that the switch eventually accepts one
1192 // of our requests or disconnect the switch. This feels
1193 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001194 log.debug("Closing {} channel because we recieved an " +
1195 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001196 sw.getChannel().close();
1197 }
1198 }
1199 // Once we support OF 1.2, we'd add code to handle it here.
1200 //if (error.getXid() == state.ofRoleRequestXid) {
1201 //}
1202 if (shouldLogError)
1203 logError(sw, error);
1204 break;
1205 case STATS_REPLY:
1206 if (state.hsState.ordinal() <
1207 HandshakeState.FEATURES_REPLY.ordinal()) {
1208 String em = "Unexpected STATS_REPLY from " + sw;
1209 throw new SwitchStateException(em);
1210 }
1211 sw.deliverStatisticsReply(m);
1212 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1213 processSwitchDescReply();
1214 }
1215 break;
1216 case PORT_STATUS:
1217 // We want to update our port state info even if we're in
1218 // the slave role, but we only want to update storage if
1219 // we're the master (or equal).
1220 boolean updateStorage = state.hsState.
1221 equals(HandshakeState.READY) &&
1222 (sw.getRole() != Role.SLAVE);
1223 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1224 shouldHandleMessage = true;
1225 break;
1226
1227 default:
1228 shouldHandleMessage = true;
1229 break;
1230 }
1231
1232 if (shouldHandleMessage) {
1233 sw.getListenerReadLock().lock();
1234 try {
1235 if (sw.isConnected()) {
1236 if (!state.hsState.equals(HandshakeState.READY)) {
1237 log.debug("Ignoring message type {} received " +
1238 "from switch {} before switch is " +
1239 "fully configured.", m.getType(), sw);
1240 }
1241 // Check if the controller is in the slave role for the
1242 // switch. If it is, then don't dispatch the message to
1243 // the listeners.
1244 // TODO: Should we dispatch messages that we expect to
1245 // receive when we're in the slave role, e.g. port
1246 // status messages? Since we're "hiding" switches from
1247 // the listeners when we're in the slave role, then it
1248 // seems a little weird to dispatch port status messages
1249 // to them. On the other hand there might be special
1250 // modules that care about all of the connected switches
1251 // and would like to receive port status notifications.
1252 else if (sw.getRole() == Role.SLAVE) {
1253 // Don't log message if it's a port status message
1254 // since we expect to receive those from the switch
1255 // and don't want to emit spurious messages.
1256 if (m.getType() != OFType.PORT_STATUS) {
1257 log.debug("Ignoring message type {} received " +
1258 "from switch {} while in the slave role.",
1259 m.getType(), sw);
1260 }
1261 } else {
1262 handleMessage(sw, m, null);
1263 }
1264 }
1265 }
1266 finally {
1267 sw.getListenerReadLock().unlock();
1268 }
1269 }
1270 }
1271 }
1272
1273 // ****************
1274 // Message handlers
1275 // ****************
1276
1277 protected void handlePortStatusMessage(IOFSwitch sw,
1278 OFPortStatus m,
1279 boolean updateStorage) {
1280 short portNumber = m.getDesc().getPortNumber();
1281 OFPhysicalPort port = m.getDesc();
1282 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001283 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1284 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001285 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001286 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001287 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1288 try {
1289 this.updates.put(update);
1290 } catch (InterruptedException e) {
1291 log.error("Failure adding update to queue", e);
1292 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001293 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001294 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1295 try {
1296 this.updates.put(update);
1297 } catch (InterruptedException e) {
1298 log.error("Failure adding update to queue", e);
1299 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001300 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001301 if (updateStorage)
1302 updatePortInfo(sw, port);
1303 log.debug("Port #{} modified for {}", portNumber, sw);
1304 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1305 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001306 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1307 try {
1308 this.updates.put(update);
1309 } catch (InterruptedException e) {
1310 log.error("Failure adding update to queue", e);
1311 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001312 if (updateStorage)
1313 updatePortInfo(sw, port);
1314 log.debug("Port #{} added for {}", portNumber, sw);
1315 } else if (m.getReason() ==
1316 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1317 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001318 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1319 try {
1320 this.updates.put(update);
1321 } catch (InterruptedException e) {
1322 log.error("Failure adding update to queue", e);
1323 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001324 if (updateStorage)
1325 removePortInfo(sw, portNumber);
1326 log.debug("Port #{} deleted for {}", portNumber, sw);
1327 }
1328 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1329 try {
1330 this.updates.put(update);
1331 } catch (InterruptedException e) {
1332 log.error("Failure adding update to queue", e);
1333 }
1334 }
1335
1336 /**
1337 * flcontext_cache - Keep a thread local stack of contexts
1338 */
1339 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1340 new ThreadLocal <Stack<FloodlightContext>> () {
1341 @Override
1342 protected Stack<FloodlightContext> initialValue() {
1343 return new Stack<FloodlightContext>();
1344 }
1345 };
1346
1347 /**
1348 * flcontext_alloc - pop a context off the stack, if required create a new one
1349 * @return FloodlightContext
1350 */
1351 protected static FloodlightContext flcontext_alloc() {
1352 FloodlightContext flcontext = null;
1353
1354 if (flcontext_cache.get().empty()) {
1355 flcontext = new FloodlightContext();
1356 }
1357 else {
1358 flcontext = flcontext_cache.get().pop();
1359 }
1360
1361 return flcontext;
1362 }
1363
1364 /**
1365 * flcontext_free - Free the context to the current thread
1366 * @param flcontext
1367 */
1368 protected void flcontext_free(FloodlightContext flcontext) {
1369 flcontext.getStorage().clear();
1370 flcontext_cache.get().push(flcontext);
1371 }
1372
1373 /**
1374 * Handle replies to certain OFMessages, and pass others off to listeners
1375 * @param sw The switch for the message
1376 * @param m The message
1377 * @param bContext The floodlight context. If null then floodlight context would
1378 * be allocated in this function
1379 * @throws IOException
1380 */
1381 @LogMessageDocs({
1382 @LogMessageDoc(level="ERROR",
1383 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1384 " field is empty.",
1385 explanation="The switch sent an improperly-formatted PacketIn" +
1386 " message",
1387 recommendation=LogMessageDoc.CHECK_SWITCH),
1388 @LogMessageDoc(level="WARN",
1389 message="Unhandled OF Message: {} from {}",
1390 explanation="The switch sent a message not handled by " +
1391 "the controller")
1392 })
1393 protected void handleMessage(IOFSwitch sw, OFMessage m,
1394 FloodlightContext bContext)
1395 throws IOException {
1396 Ethernet eth = null;
1397
1398 switch (m.getType()) {
1399 case PACKET_IN:
1400 OFPacketIn pi = (OFPacketIn)m;
1401
1402 if (pi.getPacketData().length <= 0) {
1403 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1404 ") because the data field is empty.");
1405 return;
1406 }
1407
1408 if (Controller.ALWAYS_DECODE_ETH) {
1409 eth = new Ethernet();
1410 eth.deserialize(pi.getPacketData(), 0,
1411 pi.getPacketData().length);
1412 counterStore.updatePacketInCounters(sw, m, eth);
1413 }
1414 // fall through to default case...
1415
1416 default:
1417
1418 List<IOFMessageListener> listeners = null;
1419 if (messageListeners.containsKey(m.getType())) {
1420 listeners = messageListeners.get(m.getType()).
1421 getOrderedListeners();
1422 }
1423
1424 FloodlightContext bc = null;
1425 if (listeners != null) {
1426 // Check if floodlight context is passed from the calling
1427 // function, if so use that floodlight context, otherwise
1428 // allocate one
1429 if (bContext == null) {
1430 bc = flcontext_alloc();
1431 } else {
1432 bc = bContext;
1433 }
1434 if (eth != null) {
1435 IFloodlightProviderService.bcStore.put(bc,
1436 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1437 eth);
1438 }
1439
1440 // Get the starting time (overall and per-component) of
1441 // the processing chain for this packet if performance
1442 // monitoring is turned on
1443 pktinProcTime.bootstrap(listeners);
1444 pktinProcTime.recordStartTimePktIn();
1445 Command cmd;
1446 for (IOFMessageListener listener : listeners) {
1447 if (listener instanceof IOFSwitchFilter) {
1448 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1449 continue;
1450 }
1451 }
1452
1453 pktinProcTime.recordStartTimeComp(listener);
1454 cmd = listener.receive(sw, m, bc);
1455 pktinProcTime.recordEndTimeComp(listener);
1456
1457 if (Command.STOP.equals(cmd)) {
1458 break;
1459 }
1460 }
1461 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1462 } else {
1463 log.warn("Unhandled OF Message: {} from {}", m, sw);
1464 }
1465
1466 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1467 }
1468 }
1469
1470 /**
1471 * Log an OpenFlow error message from a switch
1472 * @param sw The switch that sent the error
1473 * @param error The error message
1474 */
1475 @LogMessageDoc(level="ERROR",
1476 message="Error {error type} {error code} from {switch}",
1477 explanation="The switch responded with an unexpected error" +
1478 "to an OpenFlow message from the controller",
1479 recommendation="This could indicate improper network operation. " +
1480 "If the problem persists restarting the switch and " +
1481 "controller may help."
1482 )
1483 protected void logError(IOFSwitch sw, OFError error) {
1484 int etint = 0xffff & error.getErrorType();
1485 if (etint < 0 || etint >= OFErrorType.values().length) {
1486 log.error("Unknown error code {} from sw {}", etint, sw);
1487 }
1488 OFErrorType et = OFErrorType.values()[etint];
1489 switch (et) {
1490 case OFPET_HELLO_FAILED:
1491 OFHelloFailedCode hfc =
1492 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1493 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1494 break;
1495 case OFPET_BAD_REQUEST:
1496 OFBadRequestCode brc =
1497 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1498 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1499 break;
1500 case OFPET_BAD_ACTION:
1501 OFBadActionCode bac =
1502 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1503 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1504 break;
1505 case OFPET_FLOW_MOD_FAILED:
1506 OFFlowModFailedCode fmfc =
1507 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1508 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1509 break;
1510 case OFPET_PORT_MOD_FAILED:
1511 OFPortModFailedCode pmfc =
1512 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1513 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1514 break;
1515 case OFPET_QUEUE_OP_FAILED:
1516 OFQueueOpFailedCode qofc =
1517 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1518 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1519 break;
1520 default:
1521 break;
1522 }
1523 }
1524
1525 /**
1526 * Add a switch to the active switch list and call the switch listeners.
1527 * This happens either when a switch first connects (and the controller is
1528 * not in the slave role) or when the role of the controller changes from
1529 * slave to master.
1530 * @param sw the switch that has been added
1531 */
1532 // TODO: need to rethink locking and the synchronous switch update.
1533 // We can / should also handle duplicate DPIDs in connectedSwitches
1534 @LogMessageDoc(level="ERROR",
1535 message="New switch added {switch} for already-added switch {switch}",
1536 explanation="A switch with the same DPID as another switch " +
1537 "connected to the controller. This can be caused by " +
1538 "multiple switches configured with the same DPID, or " +
1539 "by a switch reconnected very quickly after " +
1540 "disconnecting.",
1541 recommendation="If this happens repeatedly, it is likely there " +
1542 "are switches with duplicate DPIDs on the network. " +
1543 "Reconfigure the appropriate switches. If it happens " +
1544 "very rarely, then it is likely this is a transient " +
1545 "network problem that can be ignored."
1546 )
1547 protected void addSwitch(IOFSwitch sw) {
1548 // TODO: is it safe to modify the HashMap without holding
1549 // the old switch's lock?
1550 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1551 if (sw == oldSw) {
1552 // Note == for object equality, not .equals for value
1553 log.info("New add switch for pre-existing switch {}", sw);
1554 return;
1555 }
1556
1557 if (oldSw != null) {
1558 oldSw.getListenerWriteLock().lock();
1559 try {
1560 log.error("New switch added {} for already-added switch {}",
1561 sw, oldSw);
1562 // Set the connected flag to false to suppress calling
1563 // the listeners for this switch in processOFMessage
1564 oldSw.setConnected(false);
1565
1566 oldSw.cancelAllStatisticsReplies();
1567
1568 updateInactiveSwitchInfo(oldSw);
1569
1570 // we need to clean out old switch state definitively
1571 // before adding the new switch
1572 // FIXME: It seems not completely kosher to call the
1573 // switch listeners here. I thought one of the points of
1574 // having the asynchronous switch update mechanism was so
1575 // the addedSwitch and removedSwitch were always called
1576 // from a single thread to simplify concurrency issues
1577 // for the listener.
1578 if (switchListeners != null) {
1579 for (IOFSwitchListener listener : switchListeners) {
1580 listener.removedSwitch(oldSw);
1581 }
1582 }
1583 // will eventually trigger a removeSwitch(), which will cause
1584 // a "Not removing Switch ... already removed debug message.
1585 // TODO: Figure out a way to handle this that avoids the
1586 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001587 log.debug("Closing {} because a new IOFSwitch got added " +
1588 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001589 oldSw.getChannel().close();
1590 }
1591 finally {
1592 oldSw.getListenerWriteLock().unlock();
1593 }
1594 }
1595
1596 updateActiveSwitchInfo(sw);
1597 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1598 try {
1599 this.updates.put(update);
1600 } catch (InterruptedException e) {
1601 log.error("Failure adding update to queue", e);
1602 }
1603 }
1604
1605 /**
1606 * Remove a switch from the active switch list and call the switch listeners.
1607 * This happens either when the switch is disconnected or when the
1608 * controller's role for the switch changes from master to slave.
1609 * @param sw the switch that has been removed
1610 */
1611 protected void removeSwitch(IOFSwitch sw) {
1612 // No need to acquire the listener lock, since
1613 // this method is only called after netty has processed all
1614 // pending messages
1615 log.debug("removeSwitch: {}", sw);
1616 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1617 log.debug("Not removing switch {}; already removed", sw);
1618 return;
1619 }
1620 // We cancel all outstanding statistics replies if the switch transition
1621 // from active. In the future we might allow statistics requests
1622 // from slave controllers. Then we need to move this cancelation
1623 // to switch disconnect
1624 sw.cancelAllStatisticsReplies();
1625
1626 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1627 // here if role support is enabled. In that case if the switch is being
1628 // removed because we've been switched to being in the slave role, then I think
1629 // it's possible that the new master may have already been promoted to master
1630 // and written out the active switch state to storage. If we now execute
1631 // updateInactiveSwitchInfo we may wipe out all of the state that was
1632 // written out by the new master. Maybe need to revisit how we handle all
1633 // of the switch state that's written to storage.
1634
1635 updateInactiveSwitchInfo(sw);
1636 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1637 try {
1638 this.updates.put(update);
1639 } catch (InterruptedException e) {
1640 log.error("Failure adding update to queue", e);
1641 }
1642 }
1643
1644 // ***************
1645 // IFloodlightProvider
1646 // ***************
1647
1648 @Override
1649 public synchronized void addOFMessageListener(OFType type,
1650 IOFMessageListener listener) {
1651 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1652 messageListeners.get(type);
1653 if (ldd == null) {
1654 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1655 messageListeners.put(type, ldd);
1656 }
1657 ldd.addListener(type, listener);
1658 }
1659
1660 @Override
1661 public synchronized void removeOFMessageListener(OFType type,
1662 IOFMessageListener listener) {
1663 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1664 messageListeners.get(type);
1665 if (ldd != null) {
1666 ldd.removeListener(listener);
1667 }
1668 }
1669
1670 private void logListeners() {
1671 for (Map.Entry<OFType,
1672 ListenerDispatcher<OFType,
1673 IOFMessageListener>> entry
1674 : messageListeners.entrySet()) {
1675
1676 OFType type = entry.getKey();
1677 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1678 entry.getValue();
1679
1680 StringBuffer sb = new StringBuffer();
1681 sb.append("OFListeners for ");
1682 sb.append(type);
1683 sb.append(": ");
1684 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1685 sb.append(l.getName());
1686 sb.append(",");
1687 }
1688 log.debug(sb.toString());
1689 }
1690 }
1691
1692 public void removeOFMessageListeners(OFType type) {
1693 messageListeners.remove(type);
1694 }
1695
1696 @Override
1697 public Map<Long, IOFSwitch> getSwitches() {
1698 return Collections.unmodifiableMap(this.activeSwitches);
1699 }
1700
1701 @Override
1702 public void addOFSwitchListener(IOFSwitchListener listener) {
1703 this.switchListeners.add(listener);
1704 }
1705
1706 @Override
1707 public void removeOFSwitchListener(IOFSwitchListener listener) {
1708 this.switchListeners.remove(listener);
1709 }
1710
1711 @Override
1712 public Map<OFType, List<IOFMessageListener>> getListeners() {
1713 Map<OFType, List<IOFMessageListener>> lers =
1714 new HashMap<OFType, List<IOFMessageListener>>();
1715 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1716 messageListeners.entrySet()) {
1717 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1718 }
1719 return Collections.unmodifiableMap(lers);
1720 }
1721
1722 @Override
1723 @LogMessageDocs({
1724 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1725 "a null switch",
1726 explanation="Failed to process a message because the switch " +
1727 " is no longer connected."),
1728 @LogMessageDoc(level="ERROR",
1729 message="Error reinjecting OFMessage on switch {switch}",
1730 explanation="An I/O error occured while attempting to " +
1731 "process an OpenFlow message",
1732 recommendation=LogMessageDoc.CHECK_SWITCH)
1733 })
1734 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1735 FloodlightContext bc) {
1736 if (sw == null) {
1737 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1738 return false;
1739 }
1740
1741 // FIXME: Do we need to be able to inject messages to switches
1742 // where we're the slave controller (i.e. they're connected but
1743 // not active)?
1744 // FIXME: Don't we need synchronization logic here so we're holding
1745 // the listener read lock when we call handleMessage? After some
1746 // discussions it sounds like the right thing to do here would be to
1747 // inject the message as a netty upstream channel event so it goes
1748 // through the normal netty event processing, including being
1749 // handled
1750 if (!activeSwitches.containsKey(sw.getId())) return false;
1751
1752 try {
1753 // Pass Floodlight context to the handleMessages()
1754 handleMessage(sw, msg, bc);
1755 } catch (IOException e) {
1756 log.error("Error reinjecting OFMessage on switch {}",
1757 HexString.toHexString(sw.getId()));
1758 return false;
1759 }
1760 return true;
1761 }
1762
1763 @Override
1764 @LogMessageDoc(message="Calling System.exit",
1765 explanation="The controller is terminating")
1766 public synchronized void terminate() {
1767 log.info("Calling System.exit");
1768 System.exit(1);
1769 }
1770
1771 @Override
1772 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1773 // call the overloaded version with floodlight context set to null
1774 return injectOfMessage(sw, msg, null);
1775 }
1776
1777 @Override
1778 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1779 FloodlightContext bc) {
1780 if (log.isTraceEnabled()) {
1781 String str = OFMessage.getDataAsString(sw, m, bc);
1782 log.trace("{}", str);
1783 }
1784
1785 List<IOFMessageListener> listeners = null;
1786 if (messageListeners.containsKey(m.getType())) {
1787 listeners =
1788 messageListeners.get(m.getType()).getOrderedListeners();
1789 }
1790
1791 if (listeners != null) {
1792 for (IOFMessageListener listener : listeners) {
1793 if (listener instanceof IOFSwitchFilter) {
1794 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1795 continue;
1796 }
1797 }
1798 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1799 break;
1800 }
1801 }
1802 }
1803 }
1804
1805 @Override
1806 public BasicFactory getOFMessageFactory() {
1807 return factory;
1808 }
1809
1810 @Override
1811 public String getControllerId() {
1812 return controllerId;
1813 }
1814
1815 // **************
1816 // Initialization
1817 // **************
1818
1819 protected void updateAllInactiveSwitchInfo() {
1820 if (role == Role.SLAVE) {
1821 return;
1822 }
1823 String controllerId = getControllerId();
1824 String[] switchColumns = { SWITCH_DATAPATH_ID,
1825 SWITCH_CONTROLLER_ID,
1826 SWITCH_ACTIVE };
1827 String[] portColumns = { PORT_ID, PORT_SWITCH };
1828 IResultSet switchResultSet = null;
1829 try {
1830 OperatorPredicate op =
1831 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1832 OperatorPredicate.Operator.EQ,
1833 controllerId);
1834 switchResultSet =
1835 storageSource.executeQuery(SWITCH_TABLE_NAME,
1836 switchColumns,
1837 op, null);
1838 while (switchResultSet.next()) {
1839 IResultSet portResultSet = null;
1840 try {
1841 String datapathId =
1842 switchResultSet.getString(SWITCH_DATAPATH_ID);
1843 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1844 op = new OperatorPredicate(PORT_SWITCH,
1845 OperatorPredicate.Operator.EQ,
1846 datapathId);
1847 portResultSet =
1848 storageSource.executeQuery(PORT_TABLE_NAME,
1849 portColumns,
1850 op, null);
1851 while (portResultSet.next()) {
1852 portResultSet.deleteRow();
1853 }
1854 portResultSet.save();
1855 }
1856 finally {
1857 if (portResultSet != null)
1858 portResultSet.close();
1859 }
1860 }
1861 switchResultSet.save();
1862 }
1863 finally {
1864 if (switchResultSet != null)
1865 switchResultSet.close();
1866 }
1867 }
1868
1869 protected void updateControllerInfo() {
1870 updateAllInactiveSwitchInfo();
1871
1872 // Write out the controller info to the storage source
1873 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1874 String id = getControllerId();
1875 controllerInfo.put(CONTROLLER_ID, id);
1876 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1877 }
1878
1879 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1880 if (role == Role.SLAVE) {
1881 return;
1882 }
1883 // Obtain the row info for the switch
1884 Map<String, Object> switchInfo = new HashMap<String, Object>();
1885 String datapathIdString = sw.getStringId();
1886 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1887 String controllerId = getControllerId();
1888 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1889 Date connectedSince = sw.getConnectedSince();
1890 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1891 Channel channel = sw.getChannel();
1892 SocketAddress socketAddress = channel.getRemoteAddress();
1893 if (socketAddress != null) {
1894 String socketAddressString = socketAddress.toString();
1895 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1896 if (socketAddress instanceof InetSocketAddress) {
1897 InetSocketAddress inetSocketAddress =
1898 (InetSocketAddress)socketAddress;
1899 InetAddress inetAddress = inetSocketAddress.getAddress();
1900 String ip = inetAddress.getHostAddress();
1901 switchInfo.put(SWITCH_IP, ip);
1902 }
1903 }
1904
1905 // Write out the switch features info
1906 long capabilities = U32.f(sw.getCapabilities());
1907 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1908 long buffers = U32.f(sw.getBuffers());
1909 switchInfo.put(SWITCH_BUFFERS, buffers);
1910 long tables = U32.f(sw.getTables());
1911 switchInfo.put(SWITCH_TABLES, tables);
1912 long actions = U32.f(sw.getActions());
1913 switchInfo.put(SWITCH_ACTIONS, actions);
1914 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1915
1916 // Update the switch
1917 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1918
1919 // Update the ports
1920 for (OFPhysicalPort port: sw.getPorts()) {
1921 updatePortInfo(sw, port);
1922 }
1923 }
1924
1925 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1926 if (role == Role.SLAVE) {
1927 return;
1928 }
1929 log.debug("Update DB with inactiveSW {}", sw);
1930 // Update the controller info in the storage source to be inactive
1931 Map<String, Object> switchInfo = new HashMap<String, Object>();
1932 String datapathIdString = sw.getStringId();
1933 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1934 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1935 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1936 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1937 }
1938
1939 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1940 if (role == Role.SLAVE) {
1941 return;
1942 }
1943 String datapathIdString = sw.getStringId();
1944 Map<String, Object> portInfo = new HashMap<String, Object>();
1945 int portNumber = U16.f(port.getPortNumber());
1946 String id = datapathIdString + "|" + portNumber;
1947 portInfo.put(PORT_ID, id);
1948 portInfo.put(PORT_SWITCH, datapathIdString);
1949 portInfo.put(PORT_NUMBER, portNumber);
1950 byte[] hardwareAddress = port.getHardwareAddress();
1951 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1952 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1953 String name = port.getName();
1954 portInfo.put(PORT_NAME, name);
1955 long config = U32.f(port.getConfig());
1956 portInfo.put(PORT_CONFIG, config);
1957 long state = U32.f(port.getState());
1958 portInfo.put(PORT_STATE, state);
1959 long currentFeatures = U32.f(port.getCurrentFeatures());
1960 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1961 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1962 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1963 long supportedFeatures = U32.f(port.getSupportedFeatures());
1964 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1965 long peerFeatures = U32.f(port.getPeerFeatures());
1966 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1967 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1968 }
1969
1970 /**
1971 * Read switch port data from storage and write it into a switch object
1972 * @param sw the switch to update
1973 */
1974 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1975 OperatorPredicate op =
1976 new OperatorPredicate(PORT_SWITCH,
1977 OperatorPredicate.Operator.EQ,
1978 sw.getStringId());
1979 IResultSet portResultSet =
1980 storageSource.executeQuery(PORT_TABLE_NAME,
1981 null, op, null);
1982 //Map<Short, OFPhysicalPort> oldports =
1983 // new HashMap<Short, OFPhysicalPort>();
1984 //oldports.putAll(sw.getPorts());
1985
1986 while (portResultSet.next()) {
1987 try {
1988 OFPhysicalPort p = new OFPhysicalPort();
1989 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1990 p.setName(portResultSet.getString(PORT_NAME));
1991 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1992 p.setState((int)portResultSet.getLong(PORT_STATE));
1993 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1994 p.setHardwareAddress(HexString.fromHexString(portMac));
1995 p.setCurrentFeatures((int)portResultSet.
1996 getLong(PORT_CURRENT_FEATURES));
1997 p.setAdvertisedFeatures((int)portResultSet.
1998 getLong(PORT_ADVERTISED_FEATURES));
1999 p.setSupportedFeatures((int)portResultSet.
2000 getLong(PORT_SUPPORTED_FEATURES));
2001 p.setPeerFeatures((int)portResultSet.
2002 getLong(PORT_PEER_FEATURES));
2003 //oldports.remove(Short.valueOf(p.getPortNumber()));
2004 sw.setPort(p);
2005 } catch (NullPointerException e) {
2006 // ignore
2007 }
2008 }
2009 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2010 try {
2011 this.updates.put(update);
2012 } catch (InterruptedException e) {
2013 log.error("Failure adding update to queue", e);
2014 }
2015 }
2016
2017 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2018 if (role == Role.SLAVE) {
2019 return;
2020 }
2021 String datapathIdString = sw.getStringId();
2022 String id = datapathIdString + "|" + portNumber;
2023 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2024 }
2025
2026 /**
2027 * Sets the initial role based on properties in the config params.
2028 * It looks for two different properties.
2029 * If the "role" property is specified then the value should be
2030 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2031 * controller is set to the specified value. If the "role" property
2032 * is not specified then it looks next for the "role.path" property.
2033 * In this case the value should be the path to a property file in
2034 * the file system that contains a property called "floodlight.role"
2035 * which can be one of the values listed above for the "role" property.
2036 * The idea behind the "role.path" mechanism is that you have some
2037 * separate heartbeat and master controller election algorithm that
2038 * determines the role of the controller. When a role transition happens,
2039 * it updates the current role in the file specified by the "role.path"
2040 * file. Then if floodlight restarts for some reason it can get the
2041 * correct current role of the controller from the file.
2042 * @param configParams The config params for the FloodlightProvider service
2043 * @return A valid role if role information is specified in the
2044 * config params, otherwise null
2045 */
2046 @LogMessageDocs({
2047 @LogMessageDoc(message="Controller role set to {role}",
2048 explanation="Setting the initial HA role to "),
2049 @LogMessageDoc(level="ERROR",
2050 message="Invalid current role value: {role}",
2051 explanation="An invalid HA role value was read from the " +
2052 "properties file",
2053 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2054 })
2055 protected Role getInitialRole(Map<String, String> configParams) {
2056 Role role = null;
2057 String roleString = configParams.get("role");
2058 if (roleString == null) {
2059 String rolePath = configParams.get("rolepath");
2060 if (rolePath != null) {
2061 Properties properties = new Properties();
2062 try {
2063 properties.load(new FileInputStream(rolePath));
2064 roleString = properties.getProperty("floodlight.role");
2065 }
2066 catch (IOException exc) {
2067 // Don't treat it as an error if the file specified by the
2068 // rolepath property doesn't exist. This lets us enable the
2069 // HA mechanism by just creating/setting the floodlight.role
2070 // property in that file without having to modify the
2071 // floodlight properties.
2072 }
2073 }
2074 }
2075
2076 if (roleString != null) {
2077 // Canonicalize the string to the form used for the enum constants
2078 roleString = roleString.trim().toUpperCase();
2079 try {
2080 role = Role.valueOf(roleString);
2081 }
2082 catch (IllegalArgumentException exc) {
2083 log.error("Invalid current role value: {}", roleString);
2084 }
2085 }
2086
2087 log.info("Controller role set to {}", role);
2088
2089 return role;
2090 }
2091
2092 /**
2093 * Tell controller that we're ready to accept switches loop
2094 * @throws IOException
2095 */
2096 @LogMessageDocs({
2097 @LogMessageDoc(message="Listening for switch connections on {address}",
2098 explanation="The controller is ready and listening for new" +
2099 " switch connections"),
2100 @LogMessageDoc(message="Storage exception in controller " +
2101 "updates loop; terminating process",
2102 explanation=ERROR_DATABASE,
2103 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2104 @LogMessageDoc(level="ERROR",
2105 message="Exception in controller updates loop",
2106 explanation="Failed to dispatch controller event",
2107 recommendation=LogMessageDoc.GENERIC_ACTION)
2108 })
2109 public void run() {
2110 if (log.isDebugEnabled()) {
2111 logListeners();
2112 }
2113
2114 try {
2115 final ServerBootstrap bootstrap = createServerBootStrap();
2116
2117 bootstrap.setOption("reuseAddr", true);
2118 bootstrap.setOption("child.keepAlive", true);
2119 bootstrap.setOption("child.tcpNoDelay", true);
2120 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2121
2122 ChannelPipelineFactory pfact =
2123 new OpenflowPipelineFactory(this, null);
2124 bootstrap.setPipelineFactory(pfact);
2125 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2126 final ChannelGroup cg = new DefaultChannelGroup();
2127 cg.add(bootstrap.bind(sa));
2128
2129 log.info("Listening for switch connections on {}", sa);
2130 } catch (Exception e) {
2131 throw new RuntimeException(e);
2132 }
2133
2134 // main loop
2135 while (true) {
2136 try {
2137 IUpdate update = updates.take();
2138 update.dispatch();
2139 } catch (InterruptedException e) {
2140 return;
2141 } catch (StorageException e) {
2142 log.error("Storage exception in controller " +
2143 "updates loop; terminating process", e);
2144 return;
2145 } catch (Exception e) {
2146 log.error("Exception in controller updates loop", e);
2147 }
2148 }
2149 }
2150
2151 private ServerBootstrap createServerBootStrap() {
2152 if (workerThreads == 0) {
2153 return new ServerBootstrap(
2154 new NioServerSocketChannelFactory(
2155 Executors.newCachedThreadPool(),
2156 Executors.newCachedThreadPool()));
2157 } else {
2158 return new ServerBootstrap(
2159 new NioServerSocketChannelFactory(
2160 Executors.newCachedThreadPool(),
2161 Executors.newCachedThreadPool(), workerThreads));
2162 }
2163 }
2164
2165 public void setConfigParams(Map<String, String> configParams) {
2166 String ofPort = configParams.get("openflowport");
2167 if (ofPort != null) {
2168 this.openFlowPort = Integer.parseInt(ofPort);
2169 }
2170 log.debug("OpenFlow port set to {}", this.openFlowPort);
2171 String threads = configParams.get("workerthreads");
2172 if (threads != null) {
2173 this.workerThreads = Integer.parseInt(threads);
2174 }
2175 log.debug("Number of worker threads set to {}", this.workerThreads);
2176 String controllerId = configParams.get("controllerid");
2177 if (controllerId != null) {
2178 this.controllerId = controllerId;
2179 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002180 else {
2181 //Try to get the hostname of the machine and use that for controller ID
2182 try {
2183 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2184 this.controllerId = hostname;
2185 } catch (UnknownHostException e) {
2186 // Can't get hostname, we'll just use the default
2187 }
2188 }
2189
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002190 log.debug("ControllerId set to {}", this.controllerId);
2191 }
2192
2193 private void initVendorMessages() {
2194 // Configure openflowj to be able to parse the role request/reply
2195 // vendor messages.
2196 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2197 OFNiciraVendorData.NX_VENDOR_ID, 4);
2198 OFVendorId.registerVendorId(niciraVendorId);
2199 OFBasicVendorDataType roleRequestVendorData =
2200 new OFBasicVendorDataType(
2201 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2202 OFRoleRequestVendorData.getInstantiable());
2203 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2204 OFBasicVendorDataType roleReplyVendorData =
2205 new OFBasicVendorDataType(
2206 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2207 OFRoleReplyVendorData.getInstantiable());
2208 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2209 }
2210
2211 /**
2212 * Initialize internal data structures
2213 */
2214 public void init(Map<String, String> configParams) {
2215 // These data structures are initialized here because other
2216 // module's startUp() might be called before ours
2217 this.messageListeners =
2218 new ConcurrentHashMap<OFType,
2219 ListenerDispatcher<OFType,
2220 IOFMessageListener>>();
2221 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2222 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2223 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2224 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2225 this.controllerNodeIPsCache = new HashMap<String, String>();
2226 this.updates = new LinkedBlockingQueue<IUpdate>();
2227 this.factory = new BasicFactory();
2228 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2229 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002230 //this.role = getInitialRole(configParams);
2231 //Set the controller's role to MASTER so it always tries to do role requests.
2232 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002233 this.roleChanger = new RoleChanger();
2234 initVendorMessages();
2235 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002236 }
2237
2238 /**
2239 * Startup all of the controller's components
2240 */
2241 @LogMessageDoc(message="Waiting for storage source",
2242 explanation="The system database is not yet ready",
2243 recommendation="If this message persists, this indicates " +
2244 "that the system database has failed to start. " +
2245 LogMessageDoc.CHECK_CONTROLLER)
2246 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002247 try {
2248 registryService.registerController(controllerId);
2249 } catch (RegistryException e2) {
2250 log.warn("Registry service error: {}", e2.getMessage());
2251 }
2252
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002253 // Create the table names we use
2254 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2255 storageSource.createTable(SWITCH_TABLE_NAME, null);
2256 storageSource.createTable(PORT_TABLE_NAME, null);
2257 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2258 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2259 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2260 CONTROLLER_ID);
2261 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2262 SWITCH_DATAPATH_ID);
2263 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2264 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2265 CONTROLLER_INTERFACE_ID);
2266 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2267
2268 while (true) {
2269 try {
2270 updateControllerInfo();
2271 break;
2272 }
2273 catch (StorageException e) {
2274 log.info("Waiting for storage source");
2275 try {
2276 Thread.sleep(1000);
2277 } catch (InterruptedException e1) {
2278 }
2279 }
2280 }
2281
2282 // Add our REST API
2283 restApi.addRestletRoutable(new CoreWebRoutable());
2284 }
2285
2286 @Override
2287 public void addInfoProvider(String type, IInfoProvider provider) {
2288 if (!providerMap.containsKey(type)) {
2289 providerMap.put(type, new ArrayList<IInfoProvider>());
2290 }
2291 providerMap.get(type).add(provider);
2292 }
2293
2294 @Override
2295 public void removeInfoProvider(String type, IInfoProvider provider) {
2296 if (!providerMap.containsKey(type)) {
2297 log.debug("Provider type {} doesn't exist.", type);
2298 return;
2299 }
2300
2301 providerMap.get(type).remove(provider);
2302 }
2303
2304 public Map<String, Object> getControllerInfo(String type) {
2305 if (!providerMap.containsKey(type)) return null;
2306
2307 Map<String, Object> result = new LinkedHashMap<String, Object>();
2308 for (IInfoProvider provider : providerMap.get(type)) {
2309 result.putAll(provider.getInfo(type));
2310 }
2311
2312 return result;
2313 }
2314
2315 @Override
2316 public void addHAListener(IHAListener listener) {
2317 this.haListeners.add(listener);
2318 }
2319
2320 @Override
2321 public void removeHAListener(IHAListener listener) {
2322 this.haListeners.remove(listener);
2323 }
2324
2325
2326 /**
2327 * Handle changes to the controller nodes IPs and dispatch update.
2328 */
2329 @SuppressWarnings("unchecked")
2330 protected void handleControllerNodeIPChanges() {
2331 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2332 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2333 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2334 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2335 CONTROLLER_INTERFACE_TYPE,
2336 CONTROLLER_INTERFACE_NUMBER,
2337 CONTROLLER_INTERFACE_DISCOVERED_IP };
2338 synchronized(controllerNodeIPsCache) {
2339 // We currently assume that interface Ethernet0 is the relevant
2340 // controller interface. Might change.
2341 // We could (should?) implement this using
2342 // predicates, but creating the individual and compound predicate
2343 // seems more overhead then just checking every row. Particularly,
2344 // since the number of rows is small and changes infrequent
2345 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2346 colNames,null, null);
2347 while (res.next()) {
2348 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2349 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2350 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2351 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2352 String curIP = controllerNodeIPsCache.get(controllerID);
2353
2354 curControllerNodeIPs.put(controllerID, discoveredIP);
2355 if (curIP == null) {
2356 // new controller node IP
2357 addedControllerNodeIPs.put(controllerID, discoveredIP);
2358 }
2359 else if (curIP != discoveredIP) {
2360 // IP changed
2361 removedControllerNodeIPs.put(controllerID, curIP);
2362 addedControllerNodeIPs.put(controllerID, discoveredIP);
2363 }
2364 }
2365 }
2366 // Now figure out if rows have been deleted. We can't use the
2367 // rowKeys from rowsDeleted directly, since the tables primary
2368 // key is a compound that we can't disassemble
2369 Set<String> curEntries = curControllerNodeIPs.keySet();
2370 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2371 removedEntries.removeAll(curEntries);
2372 for (String removedControllerID : removedEntries)
2373 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2374 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2375 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2376 curControllerNodeIPs, addedControllerNodeIPs,
2377 removedControllerNodeIPs);
2378 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2379 try {
2380 this.updates.put(update);
2381 } catch (InterruptedException e) {
2382 log.error("Failure adding update to queue", e);
2383 }
2384 }
2385 }
2386 }
2387
2388 @Override
2389 public Map<String, String> getControllerNodeIPs() {
2390 // We return a copy of the mapping so we can guarantee that
2391 // the mapping return is the same as one that will be (or was)
2392 // dispatched to IHAListeners
2393 HashMap<String,String> retval = new HashMap<String,String>();
2394 synchronized(controllerNodeIPsCache) {
2395 retval.putAll(controllerNodeIPsCache);
2396 }
2397 return retval;
2398 }
2399
2400 @Override
2401 public void rowsModified(String tableName, Set<Object> rowKeys) {
2402 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2403 handleControllerNodeIPChanges();
2404 }
2405
2406 }
2407
2408 @Override
2409 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2410 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2411 handleControllerNodeIPChanges();
2412 }
2413 }
2414
2415 @Override
2416 public long getSystemStartTime() {
2417 return (this.systemStartTime);
2418 }
2419
2420 @Override
2421 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2422 this.alwaysClearFlowsOnSwAdd = value;
2423 }
2424
2425 public boolean getAlwaysClearFlowsOnSwAdd() {
2426 return this.alwaysClearFlowsOnSwAdd;
2427 }
2428}