blob: 6b16964483e6fdc80ce8a07346e9fcd6a7c1ab48 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berdedc73bb12013-08-14 13:46:38 -070061import net.floodlightcontroller.core.IUpdate;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
68import net.floodlightcontroller.packet.Ethernet;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080069import net.floodlightcontroller.restserver.IRestApiService;
70import net.floodlightcontroller.storage.IResultSet;
71import net.floodlightcontroller.storage.IStorageSourceListener;
72import net.floodlightcontroller.storage.IStorageSourceService;
73import net.floodlightcontroller.storage.OperatorPredicate;
74import net.floodlightcontroller.storage.StorageException;
75import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070076import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
Jonathan Hart8a5d0972013-12-04 10:02:44 -080077import net.onrc.onos.ofcontroller.linkdiscovery.ILinkDiscoveryService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080078import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080079import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080080import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080081
82import org.jboss.netty.bootstrap.ServerBootstrap;
83import org.jboss.netty.buffer.ChannelBuffer;
84import org.jboss.netty.buffer.ChannelBuffers;
85import org.jboss.netty.channel.Channel;
86import org.jboss.netty.channel.ChannelHandlerContext;
87import org.jboss.netty.channel.ChannelPipelineFactory;
88import org.jboss.netty.channel.ChannelStateEvent;
89import org.jboss.netty.channel.ChannelUpstreamHandler;
90import org.jboss.netty.channel.Channels;
91import org.jboss.netty.channel.ExceptionEvent;
92import org.jboss.netty.channel.MessageEvent;
93import org.jboss.netty.channel.group.ChannelGroup;
94import org.jboss.netty.channel.group.DefaultChannelGroup;
95import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
96import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
97import org.jboss.netty.handler.timeout.IdleStateEvent;
98import org.jboss.netty.handler.timeout.ReadTimeoutException;
99import org.openflow.protocol.OFEchoReply;
100import org.openflow.protocol.OFError;
101import org.openflow.protocol.OFError.OFBadActionCode;
102import org.openflow.protocol.OFError.OFBadRequestCode;
103import org.openflow.protocol.OFError.OFErrorType;
104import org.openflow.protocol.OFError.OFFlowModFailedCode;
105import org.openflow.protocol.OFError.OFHelloFailedCode;
106import org.openflow.protocol.OFError.OFPortModFailedCode;
107import org.openflow.protocol.OFError.OFQueueOpFailedCode;
108import org.openflow.protocol.OFFeaturesReply;
109import org.openflow.protocol.OFGetConfigReply;
110import org.openflow.protocol.OFMessage;
111import org.openflow.protocol.OFPacketIn;
112import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800113import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800115import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800116import org.openflow.protocol.OFPortStatus.OFPortReason;
117import org.openflow.protocol.OFSetConfig;
118import org.openflow.protocol.OFStatisticsRequest;
119import org.openflow.protocol.OFSwitchConfig;
120import org.openflow.protocol.OFType;
121import org.openflow.protocol.OFVendor;
122import org.openflow.protocol.factory.BasicFactory;
123import org.openflow.protocol.factory.MessageParseException;
124import org.openflow.protocol.statistics.OFDescriptionStatistics;
125import org.openflow.protocol.statistics.OFStatistics;
126import org.openflow.protocol.statistics.OFStatisticsType;
127import org.openflow.protocol.vendor.OFBasicVendorDataType;
128import org.openflow.protocol.vendor.OFBasicVendorId;
129import org.openflow.protocol.vendor.OFVendorId;
130import org.openflow.util.HexString;
131import org.openflow.util.U16;
132import org.openflow.util.U32;
133import org.openflow.vendor.nicira.OFNiciraVendorData;
134import org.openflow.vendor.nicira.OFRoleReplyVendorData;
135import org.openflow.vendor.nicira.OFRoleRequestVendorData;
136import org.openflow.vendor.nicira.OFRoleVendorData;
137import org.slf4j.Logger;
138import org.slf4j.LoggerFactory;
139
140
141/**
142 * The main controller class. Handles all setup and network listeners
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700143 *
144 * Extensions made by ONOS are:
145 * - Detailed Port event: PORTCHANGED -> {PORTCHANGED, PORTADDED, PORTREMOVED}
146 * Available as net.onrc.onos.ofcontroller.core.IOFSwitchPortListener
147 * - Distributed ownership control of switch through RegistryService(IControllerRegistryService)
Pavlin Radoslavova653e9f2013-10-16 03:08:52 -0700148 * - Register ONOS services. (IControllerRegistryService)
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700149 * - Additional DEBUG logs
150 * - Try using hostname as controller ID, when ID was not explicitly given.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800151 */
152public class Controller implements IFloodlightProviderService,
153 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700154
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -0700155 protected final static Logger log = LoggerFactory.getLogger(Controller.class);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800156
157 private static final String ERROR_DATABASE =
158 "The controller could not communicate with the system database.";
159
160 protected BasicFactory factory;
161 protected ConcurrentMap<OFType,
162 ListenerDispatcher<OFType,IOFMessageListener>>
163 messageListeners;
164 // The activeSwitches map contains only those switches that are actively
165 // being controlled by us -- it doesn't contain switches that are
166 // in the slave role
167 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
168 // connectedSwitches contains all connected switches, including ones where
169 // we're a slave controller. We need to keep track of them so that we can
170 // send role request messages to switches when our role changes to master
171 // We add a switch to this set after it successfully completes the
172 // handshake. Access to this Set needs to be synchronized with roleChanger
173 protected HashSet<OFSwitchImpl> connectedSwitches;
174
175 // The controllerNodeIPsCache maps Controller IDs to their IP address.
176 // It's only used by handleControllerNodeIPsChanged
177 protected HashMap<String, String> controllerNodeIPsCache;
178
179 protected Set<IOFSwitchListener> switchListeners;
180 protected Set<IHAListener> haListeners;
181 protected Map<String, List<IInfoProvider>> providerMap;
182 protected BlockingQueue<IUpdate> updates;
183
184 // Module dependencies
185 protected IRestApiService restApi;
186 protected ICounterStoreService counterStore = null;
187 protected IStorageSourceService storageSource;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800188 protected IThreadPoolService threadPool;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800189 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800190
Jonathan Hart8a5d0972013-12-04 10:02:44 -0800191 protected ILinkDiscoveryService linkDiscovery;
192
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800193 // Configuration options
194 protected int openFlowPort = 6633;
195 protected int workerThreads = 0;
196 // The id for this controller node. Should be unique for each controller
197 // node in a controller cluster.
198 protected String controllerId = "localhost";
199
200 // The current role of the controller.
201 // If the controller isn't configured to support roles, then this is null.
202 protected Role role;
203 // A helper that handles sending and timeout handling for role requests
204 protected RoleChanger roleChanger;
205
206 // Start time of the controller
207 protected long systemStartTime;
208
209 // Flag to always flush flow table on switch reconnect (HA or otherwise)
210 protected boolean alwaysClearFlowsOnSwAdd = false;
211
212 // Storage table names
213 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
214 protected static final String CONTROLLER_ID = "id";
215
216 protected static final String SWITCH_TABLE_NAME = "controller_switch";
217 protected static final String SWITCH_DATAPATH_ID = "dpid";
218 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
219 protected static final String SWITCH_IP = "ip";
220 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
221 protected static final String SWITCH_ACTIVE = "active";
222 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
223 protected static final String SWITCH_CAPABILITIES = "capabilities";
224 protected static final String SWITCH_BUFFERS = "buffers";
225 protected static final String SWITCH_TABLES = "tables";
226 protected static final String SWITCH_ACTIONS = "actions";
227
228 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
229 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
230
231 protected static final String PORT_TABLE_NAME = "controller_port";
232 protected static final String PORT_ID = "id";
233 protected static final String PORT_SWITCH = "switch_id";
234 protected static final String PORT_NUMBER = "number";
235 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
236 protected static final String PORT_NAME = "name";
237 protected static final String PORT_CONFIG = "config";
238 protected static final String PORT_STATE = "state";
239 protected static final String PORT_CURRENT_FEATURES = "current_features";
240 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
241 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
242 protected static final String PORT_PEER_FEATURES = "peer_features";
243
244 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
245 protected static final String CONTROLLER_INTERFACE_ID = "id";
246 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
247 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
248 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
249 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
250
251
252
253 // Perf. related configuration
254 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
255 protected static final int BATCH_MAX_SIZE = 100;
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700256 protected static final boolean ALWAYS_DECODE_ETH = true;
257
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800258 public enum SwitchUpdateType {
259 ADDED,
260 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700261 PORTCHANGED,
262 PORTADDED,
263 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800264 }
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700265
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800266 /**
267 * Update message indicating a switch was added or removed
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700268 * ONOS: This message extended to indicate Port add or removed event.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800269 */
270 protected class SwitchUpdate implements IUpdate {
271 public IOFSwitch sw;
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700272 public OFPhysicalPort port; // Added by ONOS
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800273 public SwitchUpdateType switchUpdateType;
274 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
275 this.sw = sw;
276 this.switchUpdateType = switchUpdateType;
277 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700278 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
279 this.sw = sw;
280 this.port = port;
281 this.switchUpdateType = switchUpdateType;
282 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800283 public void dispatch() {
284 if (log.isTraceEnabled()) {
285 log.trace("Dispatching switch update {} {}",
286 sw, switchUpdateType);
287 }
288 if (switchListeners != null) {
289 for (IOFSwitchListener listener : switchListeners) {
290 switch(switchUpdateType) {
291 case ADDED:
292 listener.addedSwitch(sw);
293 break;
294 case REMOVED:
295 listener.removedSwitch(sw);
296 break;
297 case PORTCHANGED:
298 listener.switchPortChanged(sw.getId());
299 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700300 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700301 if (listener instanceof IOFSwitchPortListener) {
302 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
303 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700304 break;
305 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700306 if (listener instanceof IOFSwitchPortListener) {
307 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
308 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700309 break;
310 default:
311 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800312 }
313 }
314 }
315 }
316 }
317
318 /**
319 * Update message indicating controller's role has changed
320 */
321 protected class HARoleUpdate implements IUpdate {
322 public Role oldRole;
323 public Role newRole;
324 public HARoleUpdate(Role newRole, Role oldRole) {
325 this.oldRole = oldRole;
326 this.newRole = newRole;
327 }
328 public void dispatch() {
329 // Make sure that old and new roles are different.
330 if (oldRole == newRole) {
331 if (log.isTraceEnabled()) {
332 log.trace("HA role update ignored as the old and " +
333 "new roles are the same. newRole = {}" +
334 "oldRole = {}", newRole, oldRole);
335 }
336 return;
337 }
338 if (log.isTraceEnabled()) {
339 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
340 newRole, oldRole);
341 }
342 if (haListeners != null) {
343 for (IHAListener listener : haListeners) {
344 listener.roleChanged(oldRole, newRole);
345 }
346 }
347 }
348 }
349
350 /**
351 * Update message indicating
352 * IPs of controllers in controller cluster have changed.
353 */
354 protected class HAControllerNodeIPUpdate implements IUpdate {
355 public Map<String,String> curControllerNodeIPs;
356 public Map<String,String> addedControllerNodeIPs;
357 public Map<String,String> removedControllerNodeIPs;
358 public HAControllerNodeIPUpdate(
359 HashMap<String,String> curControllerNodeIPs,
360 HashMap<String,String> addedControllerNodeIPs,
361 HashMap<String,String> removedControllerNodeIPs) {
362 this.curControllerNodeIPs = curControllerNodeIPs;
363 this.addedControllerNodeIPs = addedControllerNodeIPs;
364 this.removedControllerNodeIPs = removedControllerNodeIPs;
365 }
366 public void dispatch() {
367 if (log.isTraceEnabled()) {
368 log.trace("Dispatching HA Controller Node IP update "
369 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
370 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
371 removedControllerNodeIPs }
372 );
373 }
374 if (haListeners != null) {
375 for (IHAListener listener: haListeners) {
376 listener.controllerNodeIPsChanged(curControllerNodeIPs,
377 addedControllerNodeIPs, removedControllerNodeIPs);
378 }
379 }
380 }
381 }
382
383 // ***************
384 // Getters/Setters
385 // ***************
386
387 public void setStorageSourceService(IStorageSourceService storageSource) {
388 this.storageSource = storageSource;
389 }
390
391 public void setCounterStore(ICounterStoreService counterStore) {
392 this.counterStore = counterStore;
393 }
394
mininet73e7fb72013-12-03 14:25:53 -0800395
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800396
397 public void setRestApiService(IRestApiService restApi) {
398 this.restApi = restApi;
399 }
400
401 public void setThreadPoolService(IThreadPoolService tp) {
402 this.threadPool = tp;
403 }
404
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800405 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800406 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800407 }
408
Jonathan Hart8a5d0972013-12-04 10:02:44 -0800409 public void setLinkDiscoveryService(ILinkDiscoveryService linkDiscovery) {
410 this.linkDiscovery = linkDiscovery;
411 }
412
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800413 @Override
414 public Role getRole() {
415 synchronized(roleChanger) {
416 return role;
417 }
418 }
419
420 @Override
421 public void setRole(Role role) {
422 if (role == null) throw new NullPointerException("Role can not be null.");
423 if (role == Role.MASTER && this.role == Role.SLAVE) {
424 // Reset db state to Inactive for all switches.
425 updateAllInactiveSwitchInfo();
426 }
427
428 // Need to synchronize to ensure a reliable ordering on role request
429 // messages send and to ensure the list of connected switches is stable
430 // RoleChanger will handle the actual sending of the message and
431 // timeout handling
432 // @see RoleChanger
433 synchronized(roleChanger) {
434 if (role.equals(this.role)) {
435 log.debug("Ignoring role change: role is already {}", role);
436 return;
437 }
438
439 Role oldRole = this.role;
440 this.role = role;
441
442 log.debug("Submitting role change request to role {}", role);
443 roleChanger.submitRequest(connectedSwitches, role);
444
445 // Enqueue an update for our listeners.
446 try {
447 this.updates.put(new HARoleUpdate(role, oldRole));
448 } catch (InterruptedException e) {
449 log.error("Failure adding update to queue", e);
450 }
451 }
452 }
453
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700454 public void publishUpdate(IUpdate update) {
455 try {
456 this.updates.put(update);
457 } catch (InterruptedException e) {
458 log.error("Failure adding update to queue", e);
459 }
460 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800461
462 // **********************
463 // ChannelUpstreamHandler
464 // **********************
465
466 /**
467 * Return a new channel handler for processing a switch connections
468 * @param state The channel state object for the connection
469 * @return the new channel handler
470 */
471 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
472 return new OFChannelHandler(state);
473 }
474
Jonathan Hartcc957a02013-02-26 10:39:04 -0800475 protected class RoleChangeCallback implements ControlChangeCallback {
476 @Override
477 public void controlChanged(long dpid, boolean hasControl) {
478 log.info("Role change callback for switch {}, hasControl {}",
479 HexString.toHexString(dpid), hasControl);
480
481 synchronized(roleChanger){
482 OFSwitchImpl sw = null;
483 for (OFSwitchImpl connectedSw : connectedSwitches){
484 if (connectedSw.getId() == dpid){
485 sw = connectedSw;
486 break;
487 }
488 }
489 if (sw == null){
490 log.warn("Switch {} not found in connected switches",
491 HexString.toHexString(dpid));
492 return;
493 }
494
495 Role role = null;
496
Pankaj Berde01939e92013-03-08 14:38:27 -0800497 /*
498 * issue #229
499 * Cannot rely on sw.getRole() as it can be behind due to pending
500 * role changes in the queue. Just submit it and late the RoleChanger
501 * handle duplicates.
502 */
503
504 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800505 role = Role.MASTER;
506 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800507 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800508 role = Role.SLAVE;
509 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800510
511 log.debug("Sending role request {} msg to {}", role, sw);
512 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
513 swList.add(sw);
514 roleChanger.submitRequest(swList, role);
515
Jonathan Hartcc957a02013-02-26 10:39:04 -0800516 }
517
518 }
519 }
520
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800521 /**
522 * Channel handler deals with the switch connection and dispatches
523 * switch messages to the appropriate locations.
524 * @author readams
525 */
526 protected class OFChannelHandler
527 extends IdleStateAwareChannelUpstreamHandler {
528 protected OFSwitchImpl sw;
529 protected OFChannelState state;
530
531 public OFChannelHandler(OFChannelState state) {
532 this.state = state;
533 }
534
535 @Override
536 @LogMessageDoc(message="New switch connection from {ip address}",
537 explanation="A new switch has connected from the " +
538 "specified IP address")
539 public void channelConnected(ChannelHandlerContext ctx,
540 ChannelStateEvent e) throws Exception {
541 log.info("New switch connection from {}",
542 e.getChannel().getRemoteAddress());
543
544 sw = new OFSwitchImpl();
545 sw.setChannel(e.getChannel());
546 sw.setFloodlightProvider(Controller.this);
547 sw.setThreadPoolService(threadPool);
548
549 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
550 msglist.add(factory.getMessage(OFType.HELLO));
551 e.getChannel().write(msglist);
552
553 }
554
555 @Override
556 @LogMessageDoc(message="Disconnected switch {switch information}",
557 explanation="The specified switch has disconnected.")
558 public void channelDisconnected(ChannelHandlerContext ctx,
559 ChannelStateEvent e) throws Exception {
560 if (sw != null && state.hsState == HandshakeState.READY) {
561 if (activeSwitches.containsKey(sw.getId())) {
562 // It's safe to call removeSwitch even though the map might
563 // not contain this particular switch but another with the
564 // same DPID
565 removeSwitch(sw);
566 }
567 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700568 if (controlRequested) {
569 registryService.releaseControl(sw.getId());
570 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800571 connectedSwitches.remove(sw);
572 }
573 sw.setConnected(false);
574 }
575 log.info("Disconnected switch {}", sw);
576 }
577
578 @Override
579 @LogMessageDocs({
580 @LogMessageDoc(level="ERROR",
581 message="Disconnecting switch {switch} due to read timeout",
582 explanation="The connected switch has failed to send any " +
583 "messages or respond to echo requests",
584 recommendation=LogMessageDoc.CHECK_SWITCH),
585 @LogMessageDoc(level="ERROR",
586 message="Disconnecting switch {switch}: failed to " +
587 "complete handshake",
588 explanation="The switch did not respond correctly " +
589 "to handshake messages",
590 recommendation=LogMessageDoc.CHECK_SWITCH),
591 @LogMessageDoc(level="ERROR",
592 message="Disconnecting switch {switch} due to IO Error: {}",
593 explanation="There was an error communicating with the switch",
594 recommendation=LogMessageDoc.CHECK_SWITCH),
595 @LogMessageDoc(level="ERROR",
596 message="Disconnecting switch {switch} due to switch " +
597 "state error: {error}",
598 explanation="The switch sent an unexpected message",
599 recommendation=LogMessageDoc.CHECK_SWITCH),
600 @LogMessageDoc(level="ERROR",
601 message="Disconnecting switch {switch} due to " +
602 "message parse failure",
603 explanation="Could not parse a message from the switch",
604 recommendation=LogMessageDoc.CHECK_SWITCH),
605 @LogMessageDoc(level="ERROR",
606 message="Terminating controller due to storage exception",
607 explanation=ERROR_DATABASE,
608 recommendation=LogMessageDoc.CHECK_CONTROLLER),
609 @LogMessageDoc(level="ERROR",
610 message="Could not process message: queue full",
611 explanation="OpenFlow messages are arriving faster than " +
612 " the controller can process them.",
613 recommendation=LogMessageDoc.CHECK_CONTROLLER),
614 @LogMessageDoc(level="ERROR",
615 message="Error while processing message " +
616 "from switch {switch} {cause}",
617 explanation="An error occurred processing the switch message",
618 recommendation=LogMessageDoc.GENERIC_ACTION)
619 })
620 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
621 throws Exception {
622 if (e.getCause() instanceof ReadTimeoutException) {
623 // switch timeout
624 log.error("Disconnecting switch {} due to read timeout", sw);
625 ctx.getChannel().close();
626 } else if (e.getCause() instanceof HandshakeTimeoutException) {
627 log.error("Disconnecting switch {}: failed to complete handshake",
628 sw);
629 ctx.getChannel().close();
630 } else if (e.getCause() instanceof ClosedChannelException) {
631 //log.warn("Channel for sw {} already closed", sw);
632 } else if (e.getCause() instanceof IOException) {
633 log.error("Disconnecting switch {} due to IO Error: {}",
634 sw, e.getCause().getMessage());
635 ctx.getChannel().close();
636 } else if (e.getCause() instanceof SwitchStateException) {
637 log.error("Disconnecting switch {} due to switch state error: {}",
638 sw, e.getCause().getMessage());
639 ctx.getChannel().close();
640 } else if (e.getCause() instanceof MessageParseException) {
641 log.error("Disconnecting switch " + sw +
642 " due to message parse failure",
643 e.getCause());
644 ctx.getChannel().close();
645 } else if (e.getCause() instanceof StorageException) {
646 log.error("Terminating controller due to storage exception",
647 e.getCause());
648 terminate();
649 } else if (e.getCause() instanceof RejectedExecutionException) {
650 log.warn("Could not process message: queue full");
651 } else {
652 log.error("Error while processing message from switch " + sw,
653 e.getCause());
654 ctx.getChannel().close();
655 }
656 }
657
658 @Override
659 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
660 throws Exception {
661 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
662 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
663 e.getChannel().write(msglist);
664 }
665
666 @Override
667 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
668 throws Exception {
669 if (e.getMessage() instanceof List) {
670 @SuppressWarnings("unchecked")
671 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
672
673 for (OFMessage ofm : msglist) {
674 try {
675 processOFMessage(ofm);
676 }
677 catch (Exception ex) {
678 // We are the last handler in the stream, so run the
679 // exception through the channel again by passing in
680 // ctx.getChannel().
681 Channels.fireExceptionCaught(ctx.getChannel(), ex);
682 }
683 }
684
685 // Flush all flow-mods/packet-out generated from this "train"
686 OFSwitchImpl.flush_all();
687 }
688 }
689
690 /**
691 * Process the request for the switch description
692 */
693 @LogMessageDoc(level="ERROR",
694 message="Exception in reading description " +
695 " during handshake {exception}",
696 explanation="Could not process the switch description string",
697 recommendation=LogMessageDoc.CHECK_SWITCH)
698 void processSwitchDescReply() {
699 try {
700 // Read description, if it has been updated
701 @SuppressWarnings("unchecked")
702 Future<List<OFStatistics>> desc_future =
703 (Future<List<OFStatistics>>)sw.
704 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
705 List<OFStatistics> values =
706 desc_future.get(0, TimeUnit.MILLISECONDS);
707 if (values != null) {
708 OFDescriptionStatistics description =
709 new OFDescriptionStatistics();
710 ChannelBuffer data =
711 ChannelBuffers.buffer(description.getLength());
712 for (OFStatistics f : values) {
713 f.writeTo(data);
714 description.readFrom(data);
715 break; // SHOULD be a list of length 1
716 }
717 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
718 description);
719 sw.setSwitchProperties(description);
720 data = null;
721
722 // At this time, also set other switch properties from storage
723 boolean is_core_switch = false;
724 IResultSet resultSet = null;
725 try {
726 String swid = sw.getStringId();
727 resultSet =
728 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
729 for (Iterator<IResultSet> it =
730 resultSet.iterator(); it.hasNext();) {
731 // In case of multiple rows, use the status
732 // in last row?
733 Map<String, Object> row = it.next().getRow();
734 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
735 if (log.isDebugEnabled()) {
736 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
737 "config for switch={}, is-core={}",
738 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
739 }
740 String ics =
741 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
742 is_core_switch = ics.equals("true");
743 }
744 }
745 }
746 finally {
747 if (resultSet != null)
748 resultSet.close();
749 }
750 if (is_core_switch) {
751 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
Yuta HIGUCHIe6a7aa72013-10-14 15:54:03 -0700752 true);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800753 }
754 }
755 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
756 state.hasDescription = true;
757 checkSwitchReady();
758 }
759 catch (InterruptedException ex) {
760 // Ignore
761 }
762 catch (TimeoutException ex) {
763 // Ignore
764 } catch (Exception ex) {
765 log.error("Exception in reading description " +
766 " during handshake", ex);
767 }
768 }
769
770 /**
771 * Send initial switch setup information that we need before adding
772 * the switch
773 * @throws IOException
774 */
775 void sendHelloConfiguration() throws IOException {
776 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700777 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800778 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
779 }
780
781 /**
782 * Send the configuration requests we can only do after we have
783 * the features reply
784 * @throws IOException
785 */
786 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700787 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800788 // Ensure we receive the full packet via PacketIn
789 OFSetConfig config = (OFSetConfig) factory
790 .getMessage(OFType.SET_CONFIG);
791 config.setMissSendLength((short) 0xffff)
792 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
793 sw.write(config, null);
794 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
795 null);
796
797 // Get Description to set switch-specific flags
798 OFStatisticsRequest req = new OFStatisticsRequest();
799 req.setStatisticType(OFStatisticsType.DESC);
800 req.setLengthU(req.getLengthU());
801 Future<List<OFStatistics>> dfuture =
802 sw.getStatistics(req);
803 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
804 dfuture);
805
806 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700807
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700808 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800809 protected void checkSwitchReady() {
810 if (state.hsState == HandshakeState.FEATURES_REPLY &&
811 state.hasDescription && state.hasGetConfigReply) {
812
813 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700814 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800815
816 synchronized(roleChanger) {
817 // We need to keep track of all of the switches that are connected
818 // to the controller, in any role, so that we can later send the
819 // role request messages when the controller role changes.
820 // We need to be synchronized while doing this: we must not
821 // send a another role request to the connectedSwitches until
822 // we were able to add this new switch to connectedSwitches
823 // *and* send the current role to the new switch.
824 connectedSwitches.add(sw);
825
826 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800827 //Put the switch in SLAVE mode until we know we have control
828 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
829 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
830 swList.add(sw);
831 roleChanger.submitRequest(swList, Role.SLAVE);
832
Jonathan Hartcc957a02013-02-26 10:39:04 -0800833 //Request control of the switch from the global registry
834 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700835 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800836 registryService.requestControl(sw.getId(),
837 new RoleChangeCallback());
838 } catch (RegistryException e) {
839 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700840 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800841 }
842
Jonathan Hart97801ac2013-02-26 14:29:16 -0800843
Jonathan Hartcc957a02013-02-26 10:39:04 -0800844
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800845 // Send a role request if role support is enabled for the controller
846 // This is a probe that we'll use to determine if the switch
847 // actually supports the role request message. If it does we'll
848 // get back a role reply message. If it doesn't we'll get back an
849 // OFError message.
850 // If role is MASTER we will promote switch to active
851 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800852 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800853 log.debug("This controller's role is {}, " +
854 "sending initial role request msg to {}",
855 role, sw);
856 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
857 swList.add(sw);
858 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800859 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800860 }
861 else {
862 // Role supported not enabled on controller (for now)
863 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800864 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800865 "not sending role request msg to {}",
866 role, sw);
867 // Need to clear FlowMods before we add the switch
868 // and dispatch updates otherwise we have a race condition.
869 sw.clearAllFlowMods();
870 addSwitch(sw);
871 state.firstRoleReplyReceived = true;
872 }
873 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700874 if (!controlRequested) {
875 // yield to allow other thread(s) to release control
876 try {
877 Thread.sleep(10);
878 } catch (InterruptedException e) {
879 // Ignore interruptions
880 }
881 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700882 log.debug("Closing {} because we weren't able to request control " +
883 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700884 sw.channel.close();
885 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800886 }
887 }
888
889 /* Handle a role reply message we received from the switch. Since
890 * netty serializes message dispatch we don't need to synchronize
891 * against other receive operations from the same switch, so no need
892 * to synchronize addSwitch(), removeSwitch() operations from the same
893 * connection.
894 * FIXME: However, when a switch with the same DPID connects we do
895 * need some synchronization. However, handling switches with same
896 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
897 * removedSwitch notification):1
898 *
899 */
900 @LogMessageDoc(level="ERROR",
901 message="Invalid role value in role reply message",
902 explanation="Was unable to set the HA role (master or slave) " +
903 "for the controller.",
904 recommendation=LogMessageDoc.CHECK_CONTROLLER)
905 protected void handleRoleReplyMessage(OFVendor vendorMessage,
906 OFRoleReplyVendorData roleReplyVendorData) {
907 // Map from the role code in the message to our role enum
908 int nxRole = roleReplyVendorData.getRole();
909 Role role = null;
910 switch (nxRole) {
911 case OFRoleVendorData.NX_ROLE_OTHER:
912 role = Role.EQUAL;
913 break;
914 case OFRoleVendorData.NX_ROLE_MASTER:
915 role = Role.MASTER;
916 break;
917 case OFRoleVendorData.NX_ROLE_SLAVE:
918 role = Role.SLAVE;
919 break;
920 default:
921 log.error("Invalid role value in role reply message");
922 sw.getChannel().close();
923 return;
924 }
925
926 log.debug("Handling role reply for role {} from {}. " +
927 "Controller's role is {} ",
928 new Object[] { role, sw, Controller.this.role}
929 );
930
931 sw.deliverRoleReply(vendorMessage.getXid(), role);
932
933 boolean isActive = activeSwitches.containsKey(sw.getId());
934 if (!isActive && sw.isActive()) {
935 // Transition from SLAVE to MASTER.
936
937 if (!state.firstRoleReplyReceived ||
938 getAlwaysClearFlowsOnSwAdd()) {
939 // This is the first role-reply message we receive from
940 // this switch or roles were disabled when the switch
941 // connected:
942 // Delete all pre-existing flows for new connections to
943 // the master
944 //
945 // FIXME: Need to think more about what the test should
946 // be for when we flush the flow-table? For example,
947 // if all the controllers are temporarily in the backup
948 // role (e.g. right after a failure of the master
949 // controller) at the point the switch connects, then
950 // all of the controllers will initially connect as
951 // backup controllers and not flush the flow-table.
952 // Then when one of them is promoted to master following
953 // the master controller election the flow-table
954 // will still not be flushed because that's treated as
955 // a failover event where we don't want to flush the
956 // flow-table. The end result would be that the flow
957 // table for a newly connected switch is never
958 // flushed. Not sure how to handle that case though...
959 sw.clearAllFlowMods();
960 log.debug("First role reply from master switch {}, " +
961 "clear FlowTable to active switch list",
962 HexString.toHexString(sw.getId()));
963 }
964
965 // Some switches don't seem to update us with port
966 // status messages while in slave role.
967 readSwitchPortStateFromStorage(sw);
968
969 // Only add the switch to the active switch list if
970 // we're not in the slave role. Note that if the role
971 // attribute is null, then that means that the switch
972 // doesn't support the role request messages, so in that
973 // case we're effectively in the EQUAL role and the
974 // switch should be included in the active switch list.
975 addSwitch(sw);
976 log.debug("Added master switch {} to active switch list",
977 HexString.toHexString(sw.getId()));
978
979 }
980 else if (isActive && !sw.isActive()) {
981 // Transition from MASTER to SLAVE: remove switch
982 // from active switch list.
983 log.debug("Removed slave switch {} from active switch" +
984 " list", HexString.toHexString(sw.getId()));
985 removeSwitch(sw);
986 }
987
988 // Indicate that we have received a role reply message.
989 state.firstRoleReplyReceived = true;
990 }
991
992 protected boolean handleVendorMessage(OFVendor vendorMessage) {
993 boolean shouldHandleMessage = false;
994 int vendor = vendorMessage.getVendor();
995 switch (vendor) {
996 case OFNiciraVendorData.NX_VENDOR_ID:
997 OFNiciraVendorData niciraVendorData =
998 (OFNiciraVendorData)vendorMessage.getVendorData();
999 int dataType = niciraVendorData.getDataType();
1000 switch (dataType) {
1001 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
1002 OFRoleReplyVendorData roleReplyVendorData =
1003 (OFRoleReplyVendorData) niciraVendorData;
1004 handleRoleReplyMessage(vendorMessage,
1005 roleReplyVendorData);
1006 break;
1007 default:
1008 log.warn("Unhandled Nicira VENDOR message; " +
1009 "data type = {}", dataType);
1010 break;
1011 }
1012 break;
1013 default:
1014 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1015 break;
1016 }
1017
1018 return shouldHandleMessage;
1019 }
1020
1021 /**
1022 * Dispatch an Openflow message from a switch to the appropriate
1023 * handler.
1024 * @param m The message to process
1025 * @throws IOException
1026 * @throws SwitchStateException
1027 */
1028 @LogMessageDocs({
1029 @LogMessageDoc(level="WARN",
1030 message="Config Reply from {switch} has " +
1031 "miss length set to {length}",
1032 explanation="The controller requires that the switch " +
1033 "use a miss length of 0xffff for correct " +
1034 "function",
1035 recommendation="Use a different switch to ensure " +
1036 "correct function"),
1037 @LogMessageDoc(level="WARN",
1038 message="Received ERROR from sw {switch} that "
1039 +"indicates roles are not supported "
1040 +"but we have received a valid "
1041 +"role reply earlier",
1042 explanation="The switch sent a confusing message to the" +
1043 "controller")
1044 })
1045 protected void processOFMessage(OFMessage m)
1046 throws IOException, SwitchStateException {
1047 boolean shouldHandleMessage = false;
1048
1049 switch (m.getType()) {
1050 case HELLO:
1051 if (log.isTraceEnabled())
1052 log.trace("HELLO from {}", sw);
1053
1054 if (state.hsState.equals(HandshakeState.START)) {
1055 state.hsState = HandshakeState.HELLO;
1056 sendHelloConfiguration();
1057 } else {
1058 throw new SwitchStateException("Unexpected HELLO from "
1059 + sw);
1060 }
1061 break;
1062 case ECHO_REQUEST:
1063 OFEchoReply reply =
1064 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1065 reply.setXid(m.getXid());
1066 sw.write(reply, null);
1067 break;
1068 case ECHO_REPLY:
1069 break;
1070 case FEATURES_REPLY:
1071 if (log.isTraceEnabled())
1072 log.trace("Features Reply from {}", sw);
1073
1074 sw.setFeaturesReply((OFFeaturesReply) m);
1075 if (state.hsState.equals(HandshakeState.HELLO)) {
1076 sendFeatureReplyConfiguration();
1077 state.hsState = HandshakeState.FEATURES_REPLY;
1078 // uncomment to enable "dumb" switches like cbench
1079 // state.hsState = HandshakeState.READY;
1080 // addSwitch(sw);
1081 } else {
1082 // return results to rest api caller
1083 sw.deliverOFFeaturesReply(m);
1084 // update database */
1085 updateActiveSwitchInfo(sw);
1086 }
1087 break;
1088 case GET_CONFIG_REPLY:
1089 if (log.isTraceEnabled())
1090 log.trace("Get config reply from {}", sw);
1091
1092 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1093 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1094 throw new SwitchStateException(em);
1095 }
1096 OFGetConfigReply cr = (OFGetConfigReply) m;
1097 if (cr.getMissSendLength() == (short)0xffff) {
1098 log.trace("Config Reply from {} confirms " +
1099 "miss length set to 0xffff", sw);
1100 } else {
1101 log.warn("Config Reply from {} has " +
1102 "miss length set to {}",
1103 sw, cr.getMissSendLength() & 0xffff);
1104 }
1105 state.hasGetConfigReply = true;
1106 checkSwitchReady();
1107 break;
1108 case VENDOR:
1109 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1110 break;
1111 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001112 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001113 // TODO: we need better error handling. Especially for
1114 // request/reply style message (stats, roles) we should have
1115 // a unified way to lookup the xid in the error message.
1116 // This will probable involve rewriting the way we handle
1117 // request/reply style messages.
1118 OFError error = (OFError) m;
1119 boolean shouldLogError = true;
1120 // TODO: should we check that firstRoleReplyReceived is false,
1121 // i.e., check only whether the first request fails?
1122 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1123 boolean isBadVendorError =
1124 (error.getErrorType() == OFError.OFErrorType.
1125 OFPET_BAD_REQUEST.getValue());
1126 // We expect to receive a bad vendor error when
1127 // we're connected to a switch that doesn't support
1128 // the Nicira vendor extensions (i.e. not OVS or
1129 // derived from OVS). By protocol, it should also be
1130 // BAD_VENDOR, but too many switch implementations
1131 // get it wrong and we can already check the xid()
1132 // so we can ignore the type with confidence that this
1133 // is not a spurious error
1134 shouldLogError = !isBadVendorError;
1135 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001136 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001137 if (state.firstRoleReplyReceived && (role != null)) {
1138 log.warn("Received ERROR from sw {} that "
1139 +"indicates roles are not supported "
1140 +"but we have received a valid "
1141 +"role reply earlier", sw);
1142 }
1143 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001144 Role requestedRole =
HIGUCHI Yutaeae374d2013-06-17 10:39:42 -07001145 sw.deliverRoleRequestNotSupportedEx(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001146 synchronized(roleChanger) {
1147 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001148 //This will now never happen. The Controller's role
1149 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001150 // the switch doesn't understand role request
1151 // messages and the current controller role is
1152 // slave. We need to disconnect the switch.
1153 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001154 log.warn("Closing {} channel because controller's role " +
1155 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001156 sw.getChannel().close();
1157 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001158 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001159 log.debug("Adding switch {} because we got an error" +
1160 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001161 // Controller's role is master: add to
1162 // active
1163 // TODO: check if clearing flow table is
1164 // right choice here.
1165 // Need to clear FlowMods before we add the switch
1166 // and dispatch updates otherwise we have a race condition.
1167 // TODO: switch update is async. Won't we still have a potential
1168 // race condition?
1169 sw.clearAllFlowMods();
1170 addSwitch(sw);
1171 }
1172 }
1173 }
1174 else {
1175 // TODO: Is this the right thing to do if we receive
1176 // some other error besides a bad vendor error?
1177 // Presumably that means the switch did actually
1178 // understand the role request message, but there
1179 // was some other error from processing the message.
1180 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1181 // error code, but it doesn't look like the Nicira
1182 // role request has that. Should check OVS source
1183 // code to see if it's possible for any other errors
1184 // to be returned.
1185 // If we received an error the switch is not
1186 // in the correct role, so we need to disconnect it.
1187 // We could also resend the request but then we need to
1188 // check if there are other pending request in which
1189 // case we shouldn't resend. If we do resend we need
1190 // to make sure that the switch eventually accepts one
1191 // of our requests or disconnect the switch. This feels
1192 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001193 log.debug("Closing {} channel because we recieved an " +
1194 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001195 sw.getChannel().close();
1196 }
1197 }
1198 // Once we support OF 1.2, we'd add code to handle it here.
1199 //if (error.getXid() == state.ofRoleRequestXid) {
1200 //}
1201 if (shouldLogError)
1202 logError(sw, error);
1203 break;
1204 case STATS_REPLY:
1205 if (state.hsState.ordinal() <
1206 HandshakeState.FEATURES_REPLY.ordinal()) {
1207 String em = "Unexpected STATS_REPLY from " + sw;
1208 throw new SwitchStateException(em);
1209 }
1210 sw.deliverStatisticsReply(m);
1211 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1212 processSwitchDescReply();
1213 }
1214 break;
1215 case PORT_STATUS:
1216 // We want to update our port state info even if we're in
1217 // the slave role, but we only want to update storage if
1218 // we're the master (or equal).
1219 boolean updateStorage = state.hsState.
1220 equals(HandshakeState.READY) &&
1221 (sw.getRole() != Role.SLAVE);
1222 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1223 shouldHandleMessage = true;
1224 break;
1225
1226 default:
1227 shouldHandleMessage = true;
1228 break;
1229 }
1230
1231 if (shouldHandleMessage) {
1232 sw.getListenerReadLock().lock();
1233 try {
1234 if (sw.isConnected()) {
1235 if (!state.hsState.equals(HandshakeState.READY)) {
1236 log.debug("Ignoring message type {} received " +
1237 "from switch {} before switch is " +
1238 "fully configured.", m.getType(), sw);
1239 }
1240 // Check if the controller is in the slave role for the
1241 // switch. If it is, then don't dispatch the message to
1242 // the listeners.
1243 // TODO: Should we dispatch messages that we expect to
1244 // receive when we're in the slave role, e.g. port
1245 // status messages? Since we're "hiding" switches from
1246 // the listeners when we're in the slave role, then it
1247 // seems a little weird to dispatch port status messages
1248 // to them. On the other hand there might be special
1249 // modules that care about all of the connected switches
1250 // and would like to receive port status notifications.
1251 else if (sw.getRole() == Role.SLAVE) {
1252 // Don't log message if it's a port status message
1253 // since we expect to receive those from the switch
1254 // and don't want to emit spurious messages.
1255 if (m.getType() != OFType.PORT_STATUS) {
1256 log.debug("Ignoring message type {} received " +
1257 "from switch {} while in the slave role.",
1258 m.getType(), sw);
1259 }
1260 } else {
1261 handleMessage(sw, m, null);
1262 }
1263 }
1264 }
1265 finally {
1266 sw.getListenerReadLock().unlock();
1267 }
1268 }
1269 }
1270 }
1271
1272 // ****************
1273 // Message handlers
1274 // ****************
1275
1276 protected void handlePortStatusMessage(IOFSwitch sw,
1277 OFPortStatus m,
1278 boolean updateStorage) {
1279 short portNumber = m.getDesc().getPortNumber();
1280 OFPhysicalPort port = m.getDesc();
1281 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001282 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1283 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001284 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001285 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001286 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1287 try {
1288 this.updates.put(update);
1289 } catch (InterruptedException e) {
1290 log.error("Failure adding update to queue", e);
1291 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001292 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001293 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1294 try {
1295 this.updates.put(update);
1296 } catch (InterruptedException e) {
1297 log.error("Failure adding update to queue", e);
1298 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001299 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001300 if (updateStorage)
1301 updatePortInfo(sw, port);
1302 log.debug("Port #{} modified for {}", portNumber, sw);
1303 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
Jonathan Hart8a5d0972013-12-04 10:02:44 -08001304 // XXX Workaround to prevent race condition where a link is detected
1305 // and attempted to be written to the database before the port is in
1306 // the database. We now suppress link discovery on ports until we're
1307 // sure they're in the database.
1308 linkDiscovery.AddToSuppressLLDPs(sw.getId(), port.getPortNumber());
1309
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001310 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001311 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1312 try {
1313 this.updates.put(update);
1314 } catch (InterruptedException e) {
1315 log.error("Failure adding update to queue", e);
1316 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001317 if (updateStorage)
1318 updatePortInfo(sw, port);
1319 log.debug("Port #{} added for {}", portNumber, sw);
1320 } else if (m.getReason() ==
1321 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1322 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001323 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1324 try {
1325 this.updates.put(update);
1326 } catch (InterruptedException e) {
1327 log.error("Failure adding update to queue", e);
1328 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001329 if (updateStorage)
1330 removePortInfo(sw, portNumber);
1331 log.debug("Port #{} deleted for {}", portNumber, sw);
1332 }
1333 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1334 try {
1335 this.updates.put(update);
1336 } catch (InterruptedException e) {
1337 log.error("Failure adding update to queue", e);
1338 }
1339 }
1340
1341 /**
1342 * flcontext_cache - Keep a thread local stack of contexts
1343 */
1344 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1345 new ThreadLocal <Stack<FloodlightContext>> () {
1346 @Override
1347 protected Stack<FloodlightContext> initialValue() {
1348 return new Stack<FloodlightContext>();
1349 }
1350 };
1351
1352 /**
1353 * flcontext_alloc - pop a context off the stack, if required create a new one
1354 * @return FloodlightContext
1355 */
1356 protected static FloodlightContext flcontext_alloc() {
1357 FloodlightContext flcontext = null;
1358
1359 if (flcontext_cache.get().empty()) {
1360 flcontext = new FloodlightContext();
1361 }
1362 else {
1363 flcontext = flcontext_cache.get().pop();
1364 }
1365
1366 return flcontext;
1367 }
1368
1369 /**
1370 * flcontext_free - Free the context to the current thread
1371 * @param flcontext
1372 */
1373 protected void flcontext_free(FloodlightContext flcontext) {
1374 flcontext.getStorage().clear();
1375 flcontext_cache.get().push(flcontext);
1376 }
1377
1378 /**
1379 * Handle replies to certain OFMessages, and pass others off to listeners
1380 * @param sw The switch for the message
1381 * @param m The message
1382 * @param bContext The floodlight context. If null then floodlight context would
1383 * be allocated in this function
1384 * @throws IOException
1385 */
1386 @LogMessageDocs({
1387 @LogMessageDoc(level="ERROR",
1388 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1389 " field is empty.",
1390 explanation="The switch sent an improperly-formatted PacketIn" +
1391 " message",
1392 recommendation=LogMessageDoc.CHECK_SWITCH),
1393 @LogMessageDoc(level="WARN",
1394 message="Unhandled OF Message: {} from {}",
1395 explanation="The switch sent a message not handled by " +
1396 "the controller")
1397 })
1398 protected void handleMessage(IOFSwitch sw, OFMessage m,
1399 FloodlightContext bContext)
1400 throws IOException {
1401 Ethernet eth = null;
1402
1403 switch (m.getType()) {
1404 case PACKET_IN:
1405 OFPacketIn pi = (OFPacketIn)m;
1406
1407 if (pi.getPacketData().length <= 0) {
1408 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1409 ") because the data field is empty.");
1410 return;
1411 }
1412
1413 if (Controller.ALWAYS_DECODE_ETH) {
1414 eth = new Ethernet();
1415 eth.deserialize(pi.getPacketData(), 0,
1416 pi.getPacketData().length);
1417 counterStore.updatePacketInCounters(sw, m, eth);
1418 }
1419 // fall through to default case...
1420
1421 default:
1422
1423 List<IOFMessageListener> listeners = null;
1424 if (messageListeners.containsKey(m.getType())) {
1425 listeners = messageListeners.get(m.getType()).
1426 getOrderedListeners();
1427 }
1428
1429 FloodlightContext bc = null;
1430 if (listeners != null) {
1431 // Check if floodlight context is passed from the calling
1432 // function, if so use that floodlight context, otherwise
1433 // allocate one
1434 if (bContext == null) {
1435 bc = flcontext_alloc();
1436 } else {
1437 bc = bContext;
1438 }
1439 if (eth != null) {
1440 IFloodlightProviderService.bcStore.put(bc,
1441 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1442 eth);
1443 }
1444
1445 // Get the starting time (overall and per-component) of
1446 // the processing chain for this packet if performance
1447 // monitoring is turned on
mininet73e7fb72013-12-03 14:25:53 -08001448
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001449 Command cmd;
1450 for (IOFMessageListener listener : listeners) {
1451 if (listener instanceof IOFSwitchFilter) {
1452 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1453 continue;
1454 }
1455 }
1456
mininet73e7fb72013-12-03 14:25:53 -08001457
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001458 cmd = listener.receive(sw, m, bc);
mininet73e7fb72013-12-03 14:25:53 -08001459
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001460
1461 if (Command.STOP.equals(cmd)) {
1462 break;
1463 }
1464 }
mininet73e7fb72013-12-03 14:25:53 -08001465
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001466 } else {
1467 log.warn("Unhandled OF Message: {} from {}", m, sw);
1468 }
1469
1470 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1471 }
1472 }
1473
1474 /**
1475 * Log an OpenFlow error message from a switch
1476 * @param sw The switch that sent the error
1477 * @param error The error message
1478 */
1479 @LogMessageDoc(level="ERROR",
1480 message="Error {error type} {error code} from {switch}",
1481 explanation="The switch responded with an unexpected error" +
1482 "to an OpenFlow message from the controller",
1483 recommendation="This could indicate improper network operation. " +
1484 "If the problem persists restarting the switch and " +
1485 "controller may help."
1486 )
1487 protected void logError(IOFSwitch sw, OFError error) {
1488 int etint = 0xffff & error.getErrorType();
1489 if (etint < 0 || etint >= OFErrorType.values().length) {
1490 log.error("Unknown error code {} from sw {}", etint, sw);
1491 }
1492 OFErrorType et = OFErrorType.values()[etint];
1493 switch (et) {
1494 case OFPET_HELLO_FAILED:
1495 OFHelloFailedCode hfc =
1496 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1497 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1498 break;
1499 case OFPET_BAD_REQUEST:
1500 OFBadRequestCode brc =
1501 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1502 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1503 break;
1504 case OFPET_BAD_ACTION:
1505 OFBadActionCode bac =
1506 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1507 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1508 break;
1509 case OFPET_FLOW_MOD_FAILED:
1510 OFFlowModFailedCode fmfc =
1511 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1512 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1513 break;
1514 case OFPET_PORT_MOD_FAILED:
1515 OFPortModFailedCode pmfc =
1516 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1517 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1518 break;
1519 case OFPET_QUEUE_OP_FAILED:
1520 OFQueueOpFailedCode qofc =
1521 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1522 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1523 break;
1524 default:
1525 break;
1526 }
1527 }
1528
1529 /**
1530 * Add a switch to the active switch list and call the switch listeners.
1531 * This happens either when a switch first connects (and the controller is
1532 * not in the slave role) or when the role of the controller changes from
1533 * slave to master.
1534 * @param sw the switch that has been added
1535 */
1536 // TODO: need to rethink locking and the synchronous switch update.
1537 // We can / should also handle duplicate DPIDs in connectedSwitches
1538 @LogMessageDoc(level="ERROR",
1539 message="New switch added {switch} for already-added switch {switch}",
1540 explanation="A switch with the same DPID as another switch " +
1541 "connected to the controller. This can be caused by " +
1542 "multiple switches configured with the same DPID, or " +
1543 "by a switch reconnected very quickly after " +
1544 "disconnecting.",
1545 recommendation="If this happens repeatedly, it is likely there " +
1546 "are switches with duplicate DPIDs on the network. " +
1547 "Reconfigure the appropriate switches. If it happens " +
1548 "very rarely, then it is likely this is a transient " +
1549 "network problem that can be ignored."
1550 )
1551 protected void addSwitch(IOFSwitch sw) {
Jonathan Hart8a5d0972013-12-04 10:02:44 -08001552 // XXX Workaround to prevent race condition where a link is detected
1553 // and attempted to be written to the database before the port is in
1554 // the database. We now suppress link discovery on ports until we're
1555 // sure they're in the database.
1556 for (OFPhysicalPort port : sw.getPorts()) {
1557 linkDiscovery.AddToSuppressLLDPs(sw.getId(), port.getPortNumber());
1558 }
1559
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001560 // TODO: is it safe to modify the HashMap without holding
1561 // the old switch's lock?
1562 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1563 if (sw == oldSw) {
1564 // Note == for object equality, not .equals for value
1565 log.info("New add switch for pre-existing switch {}", sw);
1566 return;
1567 }
1568
1569 if (oldSw != null) {
1570 oldSw.getListenerWriteLock().lock();
1571 try {
1572 log.error("New switch added {} for already-added switch {}",
1573 sw, oldSw);
1574 // Set the connected flag to false to suppress calling
1575 // the listeners for this switch in processOFMessage
1576 oldSw.setConnected(false);
1577
1578 oldSw.cancelAllStatisticsReplies();
1579
1580 updateInactiveSwitchInfo(oldSw);
1581
1582 // we need to clean out old switch state definitively
1583 // before adding the new switch
1584 // FIXME: It seems not completely kosher to call the
1585 // switch listeners here. I thought one of the points of
1586 // having the asynchronous switch update mechanism was so
1587 // the addedSwitch and removedSwitch were always called
1588 // from a single thread to simplify concurrency issues
1589 // for the listener.
1590 if (switchListeners != null) {
1591 for (IOFSwitchListener listener : switchListeners) {
1592 listener.removedSwitch(oldSw);
1593 }
1594 }
1595 // will eventually trigger a removeSwitch(), which will cause
1596 // a "Not removing Switch ... already removed debug message.
1597 // TODO: Figure out a way to handle this that avoids the
1598 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001599 log.debug("Closing {} because a new IOFSwitch got added " +
1600 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001601 oldSw.getChannel().close();
1602 }
1603 finally {
1604 oldSw.getListenerWriteLock().unlock();
1605 }
1606 }
1607
1608 updateActiveSwitchInfo(sw);
1609 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1610 try {
1611 this.updates.put(update);
1612 } catch (InterruptedException e) {
1613 log.error("Failure adding update to queue", e);
1614 }
1615 }
1616
1617 /**
1618 * Remove a switch from the active switch list and call the switch listeners.
1619 * This happens either when the switch is disconnected or when the
1620 * controller's role for the switch changes from master to slave.
1621 * @param sw the switch that has been removed
1622 */
1623 protected void removeSwitch(IOFSwitch sw) {
1624 // No need to acquire the listener lock, since
1625 // this method is only called after netty has processed all
1626 // pending messages
1627 log.debug("removeSwitch: {}", sw);
1628 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1629 log.debug("Not removing switch {}; already removed", sw);
1630 return;
1631 }
1632 // We cancel all outstanding statistics replies if the switch transition
1633 // from active. In the future we might allow statistics requests
1634 // from slave controllers. Then we need to move this cancelation
1635 // to switch disconnect
1636 sw.cancelAllStatisticsReplies();
1637
1638 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1639 // here if role support is enabled. In that case if the switch is being
1640 // removed because we've been switched to being in the slave role, then I think
1641 // it's possible that the new master may have already been promoted to master
1642 // and written out the active switch state to storage. If we now execute
1643 // updateInactiveSwitchInfo we may wipe out all of the state that was
1644 // written out by the new master. Maybe need to revisit how we handle all
1645 // of the switch state that's written to storage.
1646
1647 updateInactiveSwitchInfo(sw);
1648 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1649 try {
1650 this.updates.put(update);
1651 } catch (InterruptedException e) {
1652 log.error("Failure adding update to queue", e);
1653 }
1654 }
1655
1656 // ***************
1657 // IFloodlightProvider
1658 // ***************
1659
1660 @Override
1661 public synchronized void addOFMessageListener(OFType type,
1662 IOFMessageListener listener) {
1663 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1664 messageListeners.get(type);
1665 if (ldd == null) {
1666 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1667 messageListeners.put(type, ldd);
1668 }
1669 ldd.addListener(type, listener);
1670 }
1671
1672 @Override
1673 public synchronized void removeOFMessageListener(OFType type,
1674 IOFMessageListener listener) {
1675 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1676 messageListeners.get(type);
1677 if (ldd != null) {
1678 ldd.removeListener(listener);
1679 }
1680 }
1681
1682 private void logListeners() {
1683 for (Map.Entry<OFType,
1684 ListenerDispatcher<OFType,
1685 IOFMessageListener>> entry
1686 : messageListeners.entrySet()) {
1687
1688 OFType type = entry.getKey();
1689 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1690 entry.getValue();
1691
1692 StringBuffer sb = new StringBuffer();
1693 sb.append("OFListeners for ");
1694 sb.append(type);
1695 sb.append(": ");
1696 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1697 sb.append(l.getName());
1698 sb.append(",");
1699 }
1700 log.debug(sb.toString());
1701 }
1702 }
1703
1704 public void removeOFMessageListeners(OFType type) {
1705 messageListeners.remove(type);
1706 }
1707
1708 @Override
1709 public Map<Long, IOFSwitch> getSwitches() {
1710 return Collections.unmodifiableMap(this.activeSwitches);
1711 }
1712
1713 @Override
1714 public void addOFSwitchListener(IOFSwitchListener listener) {
1715 this.switchListeners.add(listener);
1716 }
1717
1718 @Override
1719 public void removeOFSwitchListener(IOFSwitchListener listener) {
1720 this.switchListeners.remove(listener);
1721 }
1722
1723 @Override
1724 public Map<OFType, List<IOFMessageListener>> getListeners() {
1725 Map<OFType, List<IOFMessageListener>> lers =
1726 new HashMap<OFType, List<IOFMessageListener>>();
1727 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1728 messageListeners.entrySet()) {
1729 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1730 }
1731 return Collections.unmodifiableMap(lers);
1732 }
1733
1734 @Override
1735 @LogMessageDocs({
1736 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1737 "a null switch",
1738 explanation="Failed to process a message because the switch " +
1739 " is no longer connected."),
1740 @LogMessageDoc(level="ERROR",
1741 message="Error reinjecting OFMessage on switch {switch}",
1742 explanation="An I/O error occured while attempting to " +
1743 "process an OpenFlow message",
1744 recommendation=LogMessageDoc.CHECK_SWITCH)
1745 })
1746 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1747 FloodlightContext bc) {
1748 if (sw == null) {
1749 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1750 return false;
1751 }
1752
1753 // FIXME: Do we need to be able to inject messages to switches
1754 // where we're the slave controller (i.e. they're connected but
1755 // not active)?
1756 // FIXME: Don't we need synchronization logic here so we're holding
1757 // the listener read lock when we call handleMessage? After some
1758 // discussions it sounds like the right thing to do here would be to
1759 // inject the message as a netty upstream channel event so it goes
1760 // through the normal netty event processing, including being
1761 // handled
1762 if (!activeSwitches.containsKey(sw.getId())) return false;
1763
1764 try {
1765 // Pass Floodlight context to the handleMessages()
1766 handleMessage(sw, msg, bc);
1767 } catch (IOException e) {
1768 log.error("Error reinjecting OFMessage on switch {}",
1769 HexString.toHexString(sw.getId()));
1770 return false;
1771 }
1772 return true;
1773 }
1774
1775 @Override
1776 @LogMessageDoc(message="Calling System.exit",
1777 explanation="The controller is terminating")
1778 public synchronized void terminate() {
1779 log.info("Calling System.exit");
1780 System.exit(1);
1781 }
1782
1783 @Override
1784 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1785 // call the overloaded version with floodlight context set to null
1786 return injectOfMessage(sw, msg, null);
1787 }
1788
1789 @Override
1790 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1791 FloodlightContext bc) {
1792 if (log.isTraceEnabled()) {
1793 String str = OFMessage.getDataAsString(sw, m, bc);
1794 log.trace("{}", str);
1795 }
1796
1797 List<IOFMessageListener> listeners = null;
1798 if (messageListeners.containsKey(m.getType())) {
1799 listeners =
1800 messageListeners.get(m.getType()).getOrderedListeners();
1801 }
1802
1803 if (listeners != null) {
1804 for (IOFMessageListener listener : listeners) {
1805 if (listener instanceof IOFSwitchFilter) {
1806 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1807 continue;
1808 }
1809 }
1810 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1811 break;
1812 }
1813 }
1814 }
1815 }
1816
1817 @Override
1818 public BasicFactory getOFMessageFactory() {
1819 return factory;
1820 }
1821
1822 @Override
1823 public String getControllerId() {
1824 return controllerId;
1825 }
1826
1827 // **************
1828 // Initialization
1829 // **************
1830
1831 protected void updateAllInactiveSwitchInfo() {
1832 if (role == Role.SLAVE) {
1833 return;
1834 }
1835 String controllerId = getControllerId();
1836 String[] switchColumns = { SWITCH_DATAPATH_ID,
1837 SWITCH_CONTROLLER_ID,
1838 SWITCH_ACTIVE };
1839 String[] portColumns = { PORT_ID, PORT_SWITCH };
1840 IResultSet switchResultSet = null;
1841 try {
1842 OperatorPredicate op =
1843 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1844 OperatorPredicate.Operator.EQ,
1845 controllerId);
1846 switchResultSet =
1847 storageSource.executeQuery(SWITCH_TABLE_NAME,
1848 switchColumns,
1849 op, null);
1850 while (switchResultSet.next()) {
1851 IResultSet portResultSet = null;
1852 try {
1853 String datapathId =
1854 switchResultSet.getString(SWITCH_DATAPATH_ID);
1855 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1856 op = new OperatorPredicate(PORT_SWITCH,
1857 OperatorPredicate.Operator.EQ,
1858 datapathId);
1859 portResultSet =
1860 storageSource.executeQuery(PORT_TABLE_NAME,
1861 portColumns,
1862 op, null);
1863 while (portResultSet.next()) {
1864 portResultSet.deleteRow();
1865 }
1866 portResultSet.save();
1867 }
1868 finally {
1869 if (portResultSet != null)
1870 portResultSet.close();
1871 }
1872 }
1873 switchResultSet.save();
1874 }
1875 finally {
1876 if (switchResultSet != null)
1877 switchResultSet.close();
1878 }
1879 }
1880
1881 protected void updateControllerInfo() {
1882 updateAllInactiveSwitchInfo();
1883
1884 // Write out the controller info to the storage source
1885 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1886 String id = getControllerId();
1887 controllerInfo.put(CONTROLLER_ID, id);
1888 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1889 }
1890
1891 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1892 if (role == Role.SLAVE) {
1893 return;
1894 }
1895 // Obtain the row info for the switch
1896 Map<String, Object> switchInfo = new HashMap<String, Object>();
1897 String datapathIdString = sw.getStringId();
1898 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1899 String controllerId = getControllerId();
1900 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1901 Date connectedSince = sw.getConnectedSince();
1902 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1903 Channel channel = sw.getChannel();
1904 SocketAddress socketAddress = channel.getRemoteAddress();
1905 if (socketAddress != null) {
1906 String socketAddressString = socketAddress.toString();
1907 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1908 if (socketAddress instanceof InetSocketAddress) {
1909 InetSocketAddress inetSocketAddress =
1910 (InetSocketAddress)socketAddress;
1911 InetAddress inetAddress = inetSocketAddress.getAddress();
1912 String ip = inetAddress.getHostAddress();
1913 switchInfo.put(SWITCH_IP, ip);
1914 }
1915 }
1916
1917 // Write out the switch features info
1918 long capabilities = U32.f(sw.getCapabilities());
1919 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1920 long buffers = U32.f(sw.getBuffers());
1921 switchInfo.put(SWITCH_BUFFERS, buffers);
1922 long tables = U32.f(sw.getTables());
1923 switchInfo.put(SWITCH_TABLES, tables);
1924 long actions = U32.f(sw.getActions());
1925 switchInfo.put(SWITCH_ACTIONS, actions);
1926 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1927
1928 // Update the switch
1929 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1930
1931 // Update the ports
1932 for (OFPhysicalPort port: sw.getPorts()) {
1933 updatePortInfo(sw, port);
1934 }
1935 }
1936
1937 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1938 if (role == Role.SLAVE) {
1939 return;
1940 }
1941 log.debug("Update DB with inactiveSW {}", sw);
1942 // Update the controller info in the storage source to be inactive
1943 Map<String, Object> switchInfo = new HashMap<String, Object>();
1944 String datapathIdString = sw.getStringId();
1945 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1946 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1947 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1948 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1949 }
1950
1951 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1952 if (role == Role.SLAVE) {
1953 return;
1954 }
1955 String datapathIdString = sw.getStringId();
1956 Map<String, Object> portInfo = new HashMap<String, Object>();
1957 int portNumber = U16.f(port.getPortNumber());
1958 String id = datapathIdString + "|" + portNumber;
1959 portInfo.put(PORT_ID, id);
1960 portInfo.put(PORT_SWITCH, datapathIdString);
1961 portInfo.put(PORT_NUMBER, portNumber);
1962 byte[] hardwareAddress = port.getHardwareAddress();
1963 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1964 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1965 String name = port.getName();
1966 portInfo.put(PORT_NAME, name);
1967 long config = U32.f(port.getConfig());
1968 portInfo.put(PORT_CONFIG, config);
1969 long state = U32.f(port.getState());
1970 portInfo.put(PORT_STATE, state);
1971 long currentFeatures = U32.f(port.getCurrentFeatures());
1972 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1973 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1974 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1975 long supportedFeatures = U32.f(port.getSupportedFeatures());
1976 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1977 long peerFeatures = U32.f(port.getPeerFeatures());
1978 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1979 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1980 }
1981
1982 /**
1983 * Read switch port data from storage and write it into a switch object
1984 * @param sw the switch to update
1985 */
1986 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1987 OperatorPredicate op =
1988 new OperatorPredicate(PORT_SWITCH,
1989 OperatorPredicate.Operator.EQ,
1990 sw.getStringId());
1991 IResultSet portResultSet =
1992 storageSource.executeQuery(PORT_TABLE_NAME,
1993 null, op, null);
1994 //Map<Short, OFPhysicalPort> oldports =
1995 // new HashMap<Short, OFPhysicalPort>();
1996 //oldports.putAll(sw.getPorts());
1997
1998 while (portResultSet.next()) {
1999 try {
2000 OFPhysicalPort p = new OFPhysicalPort();
2001 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
2002 p.setName(portResultSet.getString(PORT_NAME));
2003 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
2004 p.setState((int)portResultSet.getLong(PORT_STATE));
2005 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
2006 p.setHardwareAddress(HexString.fromHexString(portMac));
2007 p.setCurrentFeatures((int)portResultSet.
2008 getLong(PORT_CURRENT_FEATURES));
2009 p.setAdvertisedFeatures((int)portResultSet.
2010 getLong(PORT_ADVERTISED_FEATURES));
2011 p.setSupportedFeatures((int)portResultSet.
2012 getLong(PORT_SUPPORTED_FEATURES));
2013 p.setPeerFeatures((int)portResultSet.
2014 getLong(PORT_PEER_FEATURES));
2015 //oldports.remove(Short.valueOf(p.getPortNumber()));
2016 sw.setPort(p);
2017 } catch (NullPointerException e) {
2018 // ignore
2019 }
2020 }
2021 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2022 try {
2023 this.updates.put(update);
2024 } catch (InterruptedException e) {
2025 log.error("Failure adding update to queue", e);
2026 }
2027 }
2028
2029 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2030 if (role == Role.SLAVE) {
2031 return;
2032 }
2033 String datapathIdString = sw.getStringId();
2034 String id = datapathIdString + "|" + portNumber;
2035 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2036 }
2037
2038 /**
2039 * Sets the initial role based on properties in the config params.
2040 * It looks for two different properties.
2041 * If the "role" property is specified then the value should be
2042 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2043 * controller is set to the specified value. If the "role" property
2044 * is not specified then it looks next for the "role.path" property.
2045 * In this case the value should be the path to a property file in
2046 * the file system that contains a property called "floodlight.role"
2047 * which can be one of the values listed above for the "role" property.
2048 * The idea behind the "role.path" mechanism is that you have some
2049 * separate heartbeat and master controller election algorithm that
2050 * determines the role of the controller. When a role transition happens,
2051 * it updates the current role in the file specified by the "role.path"
2052 * file. Then if floodlight restarts for some reason it can get the
2053 * correct current role of the controller from the file.
2054 * @param configParams The config params for the FloodlightProvider service
2055 * @return A valid role if role information is specified in the
2056 * config params, otherwise null
2057 */
2058 @LogMessageDocs({
2059 @LogMessageDoc(message="Controller role set to {role}",
2060 explanation="Setting the initial HA role to "),
2061 @LogMessageDoc(level="ERROR",
2062 message="Invalid current role value: {role}",
2063 explanation="An invalid HA role value was read from the " +
2064 "properties file",
2065 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2066 })
2067 protected Role getInitialRole(Map<String, String> configParams) {
2068 Role role = null;
2069 String roleString = configParams.get("role");
2070 if (roleString == null) {
2071 String rolePath = configParams.get("rolepath");
2072 if (rolePath != null) {
2073 Properties properties = new Properties();
2074 try {
2075 properties.load(new FileInputStream(rolePath));
2076 roleString = properties.getProperty("floodlight.role");
2077 }
2078 catch (IOException exc) {
2079 // Don't treat it as an error if the file specified by the
2080 // rolepath property doesn't exist. This lets us enable the
2081 // HA mechanism by just creating/setting the floodlight.role
2082 // property in that file without having to modify the
2083 // floodlight properties.
2084 }
2085 }
2086 }
2087
2088 if (roleString != null) {
2089 // Canonicalize the string to the form used for the enum constants
2090 roleString = roleString.trim().toUpperCase();
2091 try {
2092 role = Role.valueOf(roleString);
2093 }
2094 catch (IllegalArgumentException exc) {
2095 log.error("Invalid current role value: {}", roleString);
2096 }
2097 }
2098
2099 log.info("Controller role set to {}", role);
2100
2101 return role;
2102 }
2103
2104 /**
2105 * Tell controller that we're ready to accept switches loop
2106 * @throws IOException
2107 */
2108 @LogMessageDocs({
2109 @LogMessageDoc(message="Listening for switch connections on {address}",
2110 explanation="The controller is ready and listening for new" +
2111 " switch connections"),
2112 @LogMessageDoc(message="Storage exception in controller " +
2113 "updates loop; terminating process",
2114 explanation=ERROR_DATABASE,
2115 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2116 @LogMessageDoc(level="ERROR",
2117 message="Exception in controller updates loop",
2118 explanation="Failed to dispatch controller event",
2119 recommendation=LogMessageDoc.GENERIC_ACTION)
2120 })
2121 public void run() {
2122 if (log.isDebugEnabled()) {
2123 logListeners();
2124 }
2125
2126 try {
2127 final ServerBootstrap bootstrap = createServerBootStrap();
2128
2129 bootstrap.setOption("reuseAddr", true);
2130 bootstrap.setOption("child.keepAlive", true);
2131 bootstrap.setOption("child.tcpNoDelay", true);
2132 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2133
2134 ChannelPipelineFactory pfact =
2135 new OpenflowPipelineFactory(this, null);
2136 bootstrap.setPipelineFactory(pfact);
2137 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2138 final ChannelGroup cg = new DefaultChannelGroup();
2139 cg.add(bootstrap.bind(sa));
2140
2141 log.info("Listening for switch connections on {}", sa);
2142 } catch (Exception e) {
2143 throw new RuntimeException(e);
2144 }
2145
2146 // main loop
2147 while (true) {
2148 try {
2149 IUpdate update = updates.take();
2150 update.dispatch();
2151 } catch (InterruptedException e) {
2152 return;
2153 } catch (StorageException e) {
2154 log.error("Storage exception in controller " +
2155 "updates loop; terminating process", e);
2156 return;
2157 } catch (Exception e) {
2158 log.error("Exception in controller updates loop", e);
2159 }
2160 }
2161 }
2162
2163 private ServerBootstrap createServerBootStrap() {
2164 if (workerThreads == 0) {
2165 return new ServerBootstrap(
2166 new NioServerSocketChannelFactory(
2167 Executors.newCachedThreadPool(),
2168 Executors.newCachedThreadPool()));
2169 } else {
2170 return new ServerBootstrap(
2171 new NioServerSocketChannelFactory(
2172 Executors.newCachedThreadPool(),
2173 Executors.newCachedThreadPool(), workerThreads));
2174 }
2175 }
2176
2177 public void setConfigParams(Map<String, String> configParams) {
2178 String ofPort = configParams.get("openflowport");
2179 if (ofPort != null) {
2180 this.openFlowPort = Integer.parseInt(ofPort);
2181 }
2182 log.debug("OpenFlow port set to {}", this.openFlowPort);
2183 String threads = configParams.get("workerthreads");
2184 if (threads != null) {
2185 this.workerThreads = Integer.parseInt(threads);
2186 }
2187 log.debug("Number of worker threads set to {}", this.workerThreads);
2188 String controllerId = configParams.get("controllerid");
2189 if (controllerId != null) {
2190 this.controllerId = controllerId;
2191 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002192 else {
2193 //Try to get the hostname of the machine and use that for controller ID
2194 try {
2195 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2196 this.controllerId = hostname;
2197 } catch (UnknownHostException e) {
2198 // Can't get hostname, we'll just use the default
2199 }
2200 }
2201
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002202 log.debug("ControllerId set to {}", this.controllerId);
2203 }
2204
2205 private void initVendorMessages() {
2206 // Configure openflowj to be able to parse the role request/reply
2207 // vendor messages.
2208 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2209 OFNiciraVendorData.NX_VENDOR_ID, 4);
2210 OFVendorId.registerVendorId(niciraVendorId);
2211 OFBasicVendorDataType roleRequestVendorData =
2212 new OFBasicVendorDataType(
2213 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2214 OFRoleRequestVendorData.getInstantiable());
2215 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2216 OFBasicVendorDataType roleReplyVendorData =
2217 new OFBasicVendorDataType(
2218 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2219 OFRoleReplyVendorData.getInstantiable());
2220 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2221 }
2222
2223 /**
2224 * Initialize internal data structures
2225 */
2226 public void init(Map<String, String> configParams) {
2227 // These data structures are initialized here because other
2228 // module's startUp() might be called before ours
2229 this.messageListeners =
2230 new ConcurrentHashMap<OFType,
2231 ListenerDispatcher<OFType,
2232 IOFMessageListener>>();
2233 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2234 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2235 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2236 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2237 this.controllerNodeIPsCache = new HashMap<String, String>();
2238 this.updates = new LinkedBlockingQueue<IUpdate>();
2239 this.factory = new BasicFactory();
2240 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2241 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002242 //this.role = getInitialRole(configParams);
2243 //Set the controller's role to MASTER so it always tries to do role requests.
2244 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002245 this.roleChanger = new RoleChanger();
2246 initVendorMessages();
2247 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002248 }
2249
2250 /**
2251 * Startup all of the controller's components
2252 */
2253 @LogMessageDoc(message="Waiting for storage source",
2254 explanation="The system database is not yet ready",
2255 recommendation="If this message persists, this indicates " +
2256 "that the system database has failed to start. " +
2257 LogMessageDoc.CHECK_CONTROLLER)
2258 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002259 try {
2260 registryService.registerController(controllerId);
2261 } catch (RegistryException e2) {
2262 log.warn("Registry service error: {}", e2.getMessage());
2263 }
2264
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002265 // Create the table names we use
2266 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2267 storageSource.createTable(SWITCH_TABLE_NAME, null);
2268 storageSource.createTable(PORT_TABLE_NAME, null);
2269 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2270 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2271 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2272 CONTROLLER_ID);
2273 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2274 SWITCH_DATAPATH_ID);
2275 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2276 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2277 CONTROLLER_INTERFACE_ID);
2278 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2279
2280 while (true) {
2281 try {
2282 updateControllerInfo();
2283 break;
2284 }
2285 catch (StorageException e) {
2286 log.info("Waiting for storage source");
2287 try {
2288 Thread.sleep(1000);
2289 } catch (InterruptedException e1) {
2290 }
2291 }
2292 }
2293
2294 // Add our REST API
2295 restApi.addRestletRoutable(new CoreWebRoutable());
2296 }
2297
2298 @Override
2299 public void addInfoProvider(String type, IInfoProvider provider) {
2300 if (!providerMap.containsKey(type)) {
2301 providerMap.put(type, new ArrayList<IInfoProvider>());
2302 }
2303 providerMap.get(type).add(provider);
2304 }
2305
2306 @Override
2307 public void removeInfoProvider(String type, IInfoProvider provider) {
2308 if (!providerMap.containsKey(type)) {
2309 log.debug("Provider type {} doesn't exist.", type);
2310 return;
2311 }
2312
2313 providerMap.get(type).remove(provider);
2314 }
2315
2316 public Map<String, Object> getControllerInfo(String type) {
2317 if (!providerMap.containsKey(type)) return null;
2318
2319 Map<String, Object> result = new LinkedHashMap<String, Object>();
2320 for (IInfoProvider provider : providerMap.get(type)) {
2321 result.putAll(provider.getInfo(type));
2322 }
2323
2324 return result;
2325 }
2326
2327 @Override
2328 public void addHAListener(IHAListener listener) {
2329 this.haListeners.add(listener);
2330 }
2331
2332 @Override
2333 public void removeHAListener(IHAListener listener) {
2334 this.haListeners.remove(listener);
2335 }
2336
2337
2338 /**
2339 * Handle changes to the controller nodes IPs and dispatch update.
2340 */
2341 @SuppressWarnings("unchecked")
2342 protected void handleControllerNodeIPChanges() {
2343 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2344 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2345 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2346 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2347 CONTROLLER_INTERFACE_TYPE,
2348 CONTROLLER_INTERFACE_NUMBER,
2349 CONTROLLER_INTERFACE_DISCOVERED_IP };
2350 synchronized(controllerNodeIPsCache) {
2351 // We currently assume that interface Ethernet0 is the relevant
2352 // controller interface. Might change.
2353 // We could (should?) implement this using
2354 // predicates, but creating the individual and compound predicate
2355 // seems more overhead then just checking every row. Particularly,
2356 // since the number of rows is small and changes infrequent
2357 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2358 colNames,null, null);
2359 while (res.next()) {
2360 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2361 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2362 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2363 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2364 String curIP = controllerNodeIPsCache.get(controllerID);
2365
2366 curControllerNodeIPs.put(controllerID, discoveredIP);
2367 if (curIP == null) {
2368 // new controller node IP
2369 addedControllerNodeIPs.put(controllerID, discoveredIP);
2370 }
HIGUCHI Yuta63b30722013-10-18 18:33:46 -07002371 else if (!curIP.equals(discoveredIP)) {
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002372 // IP changed
2373 removedControllerNodeIPs.put(controllerID, curIP);
2374 addedControllerNodeIPs.put(controllerID, discoveredIP);
2375 }
2376 }
2377 }
2378 // Now figure out if rows have been deleted. We can't use the
2379 // rowKeys from rowsDeleted directly, since the tables primary
2380 // key is a compound that we can't disassemble
2381 Set<String> curEntries = curControllerNodeIPs.keySet();
2382 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2383 removedEntries.removeAll(curEntries);
2384 for (String removedControllerID : removedEntries)
2385 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2386 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2387 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2388 curControllerNodeIPs, addedControllerNodeIPs,
2389 removedControllerNodeIPs);
2390 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2391 try {
2392 this.updates.put(update);
2393 } catch (InterruptedException e) {
2394 log.error("Failure adding update to queue", e);
2395 }
2396 }
2397 }
2398 }
2399
2400 @Override
2401 public Map<String, String> getControllerNodeIPs() {
2402 // We return a copy of the mapping so we can guarantee that
2403 // the mapping return is the same as one that will be (or was)
2404 // dispatched to IHAListeners
2405 HashMap<String,String> retval = new HashMap<String,String>();
2406 synchronized(controllerNodeIPsCache) {
2407 retval.putAll(controllerNodeIPsCache);
2408 }
2409 return retval;
2410 }
2411
2412 @Override
2413 public void rowsModified(String tableName, Set<Object> rowKeys) {
2414 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2415 handleControllerNodeIPChanges();
2416 }
2417
2418 }
2419
2420 @Override
2421 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2422 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2423 handleControllerNodeIPChanges();
2424 }
2425 }
2426
2427 @Override
2428 public long getSystemStartTime() {
2429 return (this.systemStartTime);
2430 }
2431
2432 @Override
2433 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2434 this.alwaysClearFlowsOnSwAdd = value;
2435 }
2436
2437 public boolean getAlwaysClearFlowsOnSwAdd() {
2438 return this.alwaysClearFlowsOnSwAdd;
2439 }
2440}