blob: bd7edb518f3056498683336cce716204fce46e27 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berdedc73bb12013-08-14 13:46:38 -070061import net.floodlightcontroller.core.IUpdate;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
68import net.floodlightcontroller.packet.Ethernet;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080069import net.floodlightcontroller.restserver.IRestApiService;
70import net.floodlightcontroller.storage.IResultSet;
71import net.floodlightcontroller.storage.IStorageSourceListener;
72import net.floodlightcontroller.storage.IStorageSourceService;
73import net.floodlightcontroller.storage.OperatorPredicate;
74import net.floodlightcontroller.storage.StorageException;
75import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070076import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080077import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080078import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080079import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080080
81import org.jboss.netty.bootstrap.ServerBootstrap;
82import org.jboss.netty.buffer.ChannelBuffer;
83import org.jboss.netty.buffer.ChannelBuffers;
84import org.jboss.netty.channel.Channel;
85import org.jboss.netty.channel.ChannelHandlerContext;
86import org.jboss.netty.channel.ChannelPipelineFactory;
87import org.jboss.netty.channel.ChannelStateEvent;
88import org.jboss.netty.channel.ChannelUpstreamHandler;
89import org.jboss.netty.channel.Channels;
90import org.jboss.netty.channel.ExceptionEvent;
91import org.jboss.netty.channel.MessageEvent;
92import org.jboss.netty.channel.group.ChannelGroup;
93import org.jboss.netty.channel.group.DefaultChannelGroup;
94import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
95import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
96import org.jboss.netty.handler.timeout.IdleStateEvent;
97import org.jboss.netty.handler.timeout.ReadTimeoutException;
98import org.openflow.protocol.OFEchoReply;
99import org.openflow.protocol.OFError;
100import org.openflow.protocol.OFError.OFBadActionCode;
101import org.openflow.protocol.OFError.OFBadRequestCode;
102import org.openflow.protocol.OFError.OFErrorType;
103import org.openflow.protocol.OFError.OFFlowModFailedCode;
104import org.openflow.protocol.OFError.OFHelloFailedCode;
105import org.openflow.protocol.OFError.OFPortModFailedCode;
106import org.openflow.protocol.OFError.OFQueueOpFailedCode;
107import org.openflow.protocol.OFFeaturesReply;
108import org.openflow.protocol.OFGetConfigReply;
109import org.openflow.protocol.OFMessage;
110import org.openflow.protocol.OFPacketIn;
111import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800112import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800113import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800114import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800115import org.openflow.protocol.OFPortStatus.OFPortReason;
116import org.openflow.protocol.OFSetConfig;
117import org.openflow.protocol.OFStatisticsRequest;
118import org.openflow.protocol.OFSwitchConfig;
119import org.openflow.protocol.OFType;
120import org.openflow.protocol.OFVendor;
121import org.openflow.protocol.factory.BasicFactory;
122import org.openflow.protocol.factory.MessageParseException;
123import org.openflow.protocol.statistics.OFDescriptionStatistics;
124import org.openflow.protocol.statistics.OFStatistics;
125import org.openflow.protocol.statistics.OFStatisticsType;
126import org.openflow.protocol.vendor.OFBasicVendorDataType;
127import org.openflow.protocol.vendor.OFBasicVendorId;
128import org.openflow.protocol.vendor.OFVendorId;
129import org.openflow.util.HexString;
130import org.openflow.util.U16;
131import org.openflow.util.U32;
132import org.openflow.vendor.nicira.OFNiciraVendorData;
133import org.openflow.vendor.nicira.OFRoleReplyVendorData;
134import org.openflow.vendor.nicira.OFRoleRequestVendorData;
135import org.openflow.vendor.nicira.OFRoleVendorData;
136import org.slf4j.Logger;
137import org.slf4j.LoggerFactory;
138
139
140/**
141 * The main controller class. Handles all setup and network listeners
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700142 *
143 * Extensions made by ONOS are:
144 * - Detailed Port event: PORTCHANGED -> {PORTCHANGED, PORTADDED, PORTREMOVED}
145 * Available as net.onrc.onos.ofcontroller.core.IOFSwitchPortListener
146 * - Distributed ownership control of switch through RegistryService(IControllerRegistryService)
Pavlin Radoslavova653e9f2013-10-16 03:08:52 -0700147 * - Register ONOS services. (IControllerRegistryService)
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700148 * - Additional DEBUG logs
149 * - Try using hostname as controller ID, when ID was not explicitly given.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800150 */
151public class Controller implements IFloodlightProviderService,
152 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700153
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -0700154 protected final static Logger log = LoggerFactory.getLogger(Controller.class);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800155
156 private static final String ERROR_DATABASE =
157 "The controller could not communicate with the system database.";
158
159 protected BasicFactory factory;
160 protected ConcurrentMap<OFType,
161 ListenerDispatcher<OFType,IOFMessageListener>>
162 messageListeners;
163 // The activeSwitches map contains only those switches that are actively
164 // being controlled by us -- it doesn't contain switches that are
165 // in the slave role
166 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
167 // connectedSwitches contains all connected switches, including ones where
168 // we're a slave controller. We need to keep track of them so that we can
169 // send role request messages to switches when our role changes to master
170 // We add a switch to this set after it successfully completes the
171 // handshake. Access to this Set needs to be synchronized with roleChanger
172 protected HashSet<OFSwitchImpl> connectedSwitches;
173
174 // The controllerNodeIPsCache maps Controller IDs to their IP address.
175 // It's only used by handleControllerNodeIPsChanged
176 protected HashMap<String, String> controllerNodeIPsCache;
177
178 protected Set<IOFSwitchListener> switchListeners;
179 protected Set<IHAListener> haListeners;
180 protected Map<String, List<IInfoProvider>> providerMap;
181 protected BlockingQueue<IUpdate> updates;
182
183 // Module dependencies
184 protected IRestApiService restApi;
185 protected ICounterStoreService counterStore = null;
186 protected IStorageSourceService storageSource;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800187 protected IThreadPoolService threadPool;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800188 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800189
190 // Configuration options
191 protected int openFlowPort = 6633;
192 protected int workerThreads = 0;
193 // The id for this controller node. Should be unique for each controller
194 // node in a controller cluster.
195 protected String controllerId = "localhost";
196
197 // The current role of the controller.
198 // If the controller isn't configured to support roles, then this is null.
199 protected Role role;
200 // A helper that handles sending and timeout handling for role requests
201 protected RoleChanger roleChanger;
202
203 // Start time of the controller
204 protected long systemStartTime;
205
206 // Flag to always flush flow table on switch reconnect (HA or otherwise)
207 protected boolean alwaysClearFlowsOnSwAdd = false;
208
209 // Storage table names
210 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
211 protected static final String CONTROLLER_ID = "id";
212
213 protected static final String SWITCH_TABLE_NAME = "controller_switch";
214 protected static final String SWITCH_DATAPATH_ID = "dpid";
215 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
216 protected static final String SWITCH_IP = "ip";
217 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
218 protected static final String SWITCH_ACTIVE = "active";
219 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
220 protected static final String SWITCH_CAPABILITIES = "capabilities";
221 protected static final String SWITCH_BUFFERS = "buffers";
222 protected static final String SWITCH_TABLES = "tables";
223 protected static final String SWITCH_ACTIONS = "actions";
224
225 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
226 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
227
228 protected static final String PORT_TABLE_NAME = "controller_port";
229 protected static final String PORT_ID = "id";
230 protected static final String PORT_SWITCH = "switch_id";
231 protected static final String PORT_NUMBER = "number";
232 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
233 protected static final String PORT_NAME = "name";
234 protected static final String PORT_CONFIG = "config";
235 protected static final String PORT_STATE = "state";
236 protected static final String PORT_CURRENT_FEATURES = "current_features";
237 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
238 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
239 protected static final String PORT_PEER_FEATURES = "peer_features";
240
241 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
242 protected static final String CONTROLLER_INTERFACE_ID = "id";
243 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
244 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
245 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
246 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
247
248
249
250 // Perf. related configuration
251 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
252 protected static final int BATCH_MAX_SIZE = 100;
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700253 protected static final boolean ALWAYS_DECODE_ETH = true;
254
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800255 public enum SwitchUpdateType {
256 ADDED,
257 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700258 PORTCHANGED,
259 PORTADDED,
260 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800261 }
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700262
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800263 /**
264 * Update message indicating a switch was added or removed
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700265 * ONOS: This message extended to indicate Port add or removed event.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800266 */
267 protected class SwitchUpdate implements IUpdate {
268 public IOFSwitch sw;
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700269 public OFPhysicalPort port; // Added by ONOS
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800270 public SwitchUpdateType switchUpdateType;
271 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
272 this.sw = sw;
273 this.switchUpdateType = switchUpdateType;
274 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700275 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
276 this.sw = sw;
277 this.port = port;
278 this.switchUpdateType = switchUpdateType;
279 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800280 public void dispatch() {
281 if (log.isTraceEnabled()) {
282 log.trace("Dispatching switch update {} {}",
283 sw, switchUpdateType);
284 }
285 if (switchListeners != null) {
286 for (IOFSwitchListener listener : switchListeners) {
287 switch(switchUpdateType) {
288 case ADDED:
289 listener.addedSwitch(sw);
290 break;
291 case REMOVED:
292 listener.removedSwitch(sw);
293 break;
294 case PORTCHANGED:
295 listener.switchPortChanged(sw.getId());
296 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700297 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700298 if (listener instanceof IOFSwitchPortListener) {
299 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
300 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700301 break;
302 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700303 if (listener instanceof IOFSwitchPortListener) {
304 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
305 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700306 break;
307 default:
308 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800309 }
310 }
311 }
312 }
313 }
314
315 /**
316 * Update message indicating controller's role has changed
317 */
318 protected class HARoleUpdate implements IUpdate {
319 public Role oldRole;
320 public Role newRole;
321 public HARoleUpdate(Role newRole, Role oldRole) {
322 this.oldRole = oldRole;
323 this.newRole = newRole;
324 }
325 public void dispatch() {
326 // Make sure that old and new roles are different.
327 if (oldRole == newRole) {
328 if (log.isTraceEnabled()) {
329 log.trace("HA role update ignored as the old and " +
330 "new roles are the same. newRole = {}" +
331 "oldRole = {}", newRole, oldRole);
332 }
333 return;
334 }
335 if (log.isTraceEnabled()) {
336 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
337 newRole, oldRole);
338 }
339 if (haListeners != null) {
340 for (IHAListener listener : haListeners) {
341 listener.roleChanged(oldRole, newRole);
342 }
343 }
344 }
345 }
346
347 /**
348 * Update message indicating
349 * IPs of controllers in controller cluster have changed.
350 */
351 protected class HAControllerNodeIPUpdate implements IUpdate {
352 public Map<String,String> curControllerNodeIPs;
353 public Map<String,String> addedControllerNodeIPs;
354 public Map<String,String> removedControllerNodeIPs;
355 public HAControllerNodeIPUpdate(
356 HashMap<String,String> curControllerNodeIPs,
357 HashMap<String,String> addedControllerNodeIPs,
358 HashMap<String,String> removedControllerNodeIPs) {
359 this.curControllerNodeIPs = curControllerNodeIPs;
360 this.addedControllerNodeIPs = addedControllerNodeIPs;
361 this.removedControllerNodeIPs = removedControllerNodeIPs;
362 }
363 public void dispatch() {
364 if (log.isTraceEnabled()) {
365 log.trace("Dispatching HA Controller Node IP update "
366 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
367 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
368 removedControllerNodeIPs }
369 );
370 }
371 if (haListeners != null) {
372 for (IHAListener listener: haListeners) {
373 listener.controllerNodeIPsChanged(curControllerNodeIPs,
374 addedControllerNodeIPs, removedControllerNodeIPs);
375 }
376 }
377 }
378 }
379
380 // ***************
381 // Getters/Setters
382 // ***************
383
384 public void setStorageSourceService(IStorageSourceService storageSource) {
385 this.storageSource = storageSource;
386 }
387
388 public void setCounterStore(ICounterStoreService counterStore) {
389 this.counterStore = counterStore;
390 }
391
mininet73e7fb72013-12-03 14:25:53 -0800392
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800393
394 public void setRestApiService(IRestApiService restApi) {
395 this.restApi = restApi;
396 }
397
398 public void setThreadPoolService(IThreadPoolService tp) {
399 this.threadPool = tp;
400 }
401
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800402 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800403 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800404 }
405
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800406 @Override
407 public Role getRole() {
408 synchronized(roleChanger) {
409 return role;
410 }
411 }
412
413 @Override
414 public void setRole(Role role) {
415 if (role == null) throw new NullPointerException("Role can not be null.");
416 if (role == Role.MASTER && this.role == Role.SLAVE) {
417 // Reset db state to Inactive for all switches.
418 updateAllInactiveSwitchInfo();
419 }
420
421 // Need to synchronize to ensure a reliable ordering on role request
422 // messages send and to ensure the list of connected switches is stable
423 // RoleChanger will handle the actual sending of the message and
424 // timeout handling
425 // @see RoleChanger
426 synchronized(roleChanger) {
427 if (role.equals(this.role)) {
428 log.debug("Ignoring role change: role is already {}", role);
429 return;
430 }
431
432 Role oldRole = this.role;
433 this.role = role;
434
435 log.debug("Submitting role change request to role {}", role);
436 roleChanger.submitRequest(connectedSwitches, role);
437
438 // Enqueue an update for our listeners.
439 try {
440 this.updates.put(new HARoleUpdate(role, oldRole));
441 } catch (InterruptedException e) {
442 log.error("Failure adding update to queue", e);
443 }
444 }
445 }
446
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700447 public void publishUpdate(IUpdate update) {
448 try {
449 this.updates.put(update);
450 } catch (InterruptedException e) {
451 log.error("Failure adding update to queue", e);
452 }
453 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800454
455 // **********************
456 // ChannelUpstreamHandler
457 // **********************
458
459 /**
460 * Return a new channel handler for processing a switch connections
461 * @param state The channel state object for the connection
462 * @return the new channel handler
463 */
464 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
465 return new OFChannelHandler(state);
466 }
467
Jonathan Hartcc957a02013-02-26 10:39:04 -0800468 protected class RoleChangeCallback implements ControlChangeCallback {
469 @Override
470 public void controlChanged(long dpid, boolean hasControl) {
471 log.info("Role change callback for switch {}, hasControl {}",
472 HexString.toHexString(dpid), hasControl);
473
474 synchronized(roleChanger){
475 OFSwitchImpl sw = null;
476 for (OFSwitchImpl connectedSw : connectedSwitches){
477 if (connectedSw.getId() == dpid){
478 sw = connectedSw;
479 break;
480 }
481 }
482 if (sw == null){
483 log.warn("Switch {} not found in connected switches",
484 HexString.toHexString(dpid));
485 return;
486 }
487
488 Role role = null;
489
Pankaj Berde01939e92013-03-08 14:38:27 -0800490 /*
491 * issue #229
492 * Cannot rely on sw.getRole() as it can be behind due to pending
493 * role changes in the queue. Just submit it and late the RoleChanger
494 * handle duplicates.
495 */
496
497 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800498 role = Role.MASTER;
499 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800500 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800501 role = Role.SLAVE;
502 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800503
504 log.debug("Sending role request {} msg to {}", role, sw);
505 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
506 swList.add(sw);
507 roleChanger.submitRequest(swList, role);
508
Jonathan Hartcc957a02013-02-26 10:39:04 -0800509 }
510
511 }
512 }
513
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800514 /**
515 * Channel handler deals with the switch connection and dispatches
516 * switch messages to the appropriate locations.
517 * @author readams
518 */
519 protected class OFChannelHandler
520 extends IdleStateAwareChannelUpstreamHandler {
521 protected OFSwitchImpl sw;
522 protected OFChannelState state;
523
524 public OFChannelHandler(OFChannelState state) {
525 this.state = state;
526 }
527
528 @Override
529 @LogMessageDoc(message="New switch connection from {ip address}",
530 explanation="A new switch has connected from the " +
531 "specified IP address")
532 public void channelConnected(ChannelHandlerContext ctx,
533 ChannelStateEvent e) throws Exception {
534 log.info("New switch connection from {}",
535 e.getChannel().getRemoteAddress());
536
537 sw = new OFSwitchImpl();
538 sw.setChannel(e.getChannel());
539 sw.setFloodlightProvider(Controller.this);
540 sw.setThreadPoolService(threadPool);
541
542 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
543 msglist.add(factory.getMessage(OFType.HELLO));
544 e.getChannel().write(msglist);
545
546 }
547
548 @Override
549 @LogMessageDoc(message="Disconnected switch {switch information}",
550 explanation="The specified switch has disconnected.")
551 public void channelDisconnected(ChannelHandlerContext ctx,
552 ChannelStateEvent e) throws Exception {
553 if (sw != null && state.hsState == HandshakeState.READY) {
554 if (activeSwitches.containsKey(sw.getId())) {
555 // It's safe to call removeSwitch even though the map might
556 // not contain this particular switch but another with the
557 // same DPID
558 removeSwitch(sw);
559 }
560 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700561 if (controlRequested) {
562 registryService.releaseControl(sw.getId());
563 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800564 connectedSwitches.remove(sw);
565 }
566 sw.setConnected(false);
567 }
568 log.info("Disconnected switch {}", sw);
569 }
570
571 @Override
572 @LogMessageDocs({
573 @LogMessageDoc(level="ERROR",
574 message="Disconnecting switch {switch} due to read timeout",
575 explanation="The connected switch has failed to send any " +
576 "messages or respond to echo requests",
577 recommendation=LogMessageDoc.CHECK_SWITCH),
578 @LogMessageDoc(level="ERROR",
579 message="Disconnecting switch {switch}: failed to " +
580 "complete handshake",
581 explanation="The switch did not respond correctly " +
582 "to handshake messages",
583 recommendation=LogMessageDoc.CHECK_SWITCH),
584 @LogMessageDoc(level="ERROR",
585 message="Disconnecting switch {switch} due to IO Error: {}",
586 explanation="There was an error communicating with the switch",
587 recommendation=LogMessageDoc.CHECK_SWITCH),
588 @LogMessageDoc(level="ERROR",
589 message="Disconnecting switch {switch} due to switch " +
590 "state error: {error}",
591 explanation="The switch sent an unexpected message",
592 recommendation=LogMessageDoc.CHECK_SWITCH),
593 @LogMessageDoc(level="ERROR",
594 message="Disconnecting switch {switch} due to " +
595 "message parse failure",
596 explanation="Could not parse a message from the switch",
597 recommendation=LogMessageDoc.CHECK_SWITCH),
598 @LogMessageDoc(level="ERROR",
599 message="Terminating controller due to storage exception",
600 explanation=ERROR_DATABASE,
601 recommendation=LogMessageDoc.CHECK_CONTROLLER),
602 @LogMessageDoc(level="ERROR",
603 message="Could not process message: queue full",
604 explanation="OpenFlow messages are arriving faster than " +
605 " the controller can process them.",
606 recommendation=LogMessageDoc.CHECK_CONTROLLER),
607 @LogMessageDoc(level="ERROR",
608 message="Error while processing message " +
609 "from switch {switch} {cause}",
610 explanation="An error occurred processing the switch message",
611 recommendation=LogMessageDoc.GENERIC_ACTION)
612 })
613 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
614 throws Exception {
615 if (e.getCause() instanceof ReadTimeoutException) {
616 // switch timeout
617 log.error("Disconnecting switch {} due to read timeout", sw);
618 ctx.getChannel().close();
619 } else if (e.getCause() instanceof HandshakeTimeoutException) {
620 log.error("Disconnecting switch {}: failed to complete handshake",
621 sw);
622 ctx.getChannel().close();
623 } else if (e.getCause() instanceof ClosedChannelException) {
624 //log.warn("Channel for sw {} already closed", sw);
625 } else if (e.getCause() instanceof IOException) {
626 log.error("Disconnecting switch {} due to IO Error: {}",
627 sw, e.getCause().getMessage());
628 ctx.getChannel().close();
629 } else if (e.getCause() instanceof SwitchStateException) {
630 log.error("Disconnecting switch {} due to switch state error: {}",
631 sw, e.getCause().getMessage());
632 ctx.getChannel().close();
633 } else if (e.getCause() instanceof MessageParseException) {
634 log.error("Disconnecting switch " + sw +
635 " due to message parse failure",
636 e.getCause());
637 ctx.getChannel().close();
638 } else if (e.getCause() instanceof StorageException) {
639 log.error("Terminating controller due to storage exception",
640 e.getCause());
641 terminate();
642 } else if (e.getCause() instanceof RejectedExecutionException) {
643 log.warn("Could not process message: queue full");
644 } else {
645 log.error("Error while processing message from switch " + sw,
646 e.getCause());
647 ctx.getChannel().close();
648 }
649 }
650
651 @Override
652 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
653 throws Exception {
654 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
655 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
656 e.getChannel().write(msglist);
657 }
658
659 @Override
660 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
661 throws Exception {
662 if (e.getMessage() instanceof List) {
663 @SuppressWarnings("unchecked")
664 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
665
666 for (OFMessage ofm : msglist) {
667 try {
668 processOFMessage(ofm);
669 }
670 catch (Exception ex) {
671 // We are the last handler in the stream, so run the
672 // exception through the channel again by passing in
673 // ctx.getChannel().
674 Channels.fireExceptionCaught(ctx.getChannel(), ex);
675 }
676 }
677
678 // Flush all flow-mods/packet-out generated from this "train"
679 OFSwitchImpl.flush_all();
680 }
681 }
682
683 /**
684 * Process the request for the switch description
685 */
686 @LogMessageDoc(level="ERROR",
687 message="Exception in reading description " +
688 " during handshake {exception}",
689 explanation="Could not process the switch description string",
690 recommendation=LogMessageDoc.CHECK_SWITCH)
691 void processSwitchDescReply() {
692 try {
693 // Read description, if it has been updated
694 @SuppressWarnings("unchecked")
695 Future<List<OFStatistics>> desc_future =
696 (Future<List<OFStatistics>>)sw.
697 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
698 List<OFStatistics> values =
699 desc_future.get(0, TimeUnit.MILLISECONDS);
700 if (values != null) {
701 OFDescriptionStatistics description =
702 new OFDescriptionStatistics();
703 ChannelBuffer data =
704 ChannelBuffers.buffer(description.getLength());
705 for (OFStatistics f : values) {
706 f.writeTo(data);
707 description.readFrom(data);
708 break; // SHOULD be a list of length 1
709 }
710 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
711 description);
712 sw.setSwitchProperties(description);
713 data = null;
714
715 // At this time, also set other switch properties from storage
716 boolean is_core_switch = false;
717 IResultSet resultSet = null;
718 try {
719 String swid = sw.getStringId();
720 resultSet =
721 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
722 for (Iterator<IResultSet> it =
723 resultSet.iterator(); it.hasNext();) {
724 // In case of multiple rows, use the status
725 // in last row?
726 Map<String, Object> row = it.next().getRow();
727 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
728 if (log.isDebugEnabled()) {
729 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
730 "config for switch={}, is-core={}",
731 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
732 }
733 String ics =
734 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
735 is_core_switch = ics.equals("true");
736 }
737 }
738 }
739 finally {
740 if (resultSet != null)
741 resultSet.close();
742 }
743 if (is_core_switch) {
744 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
Yuta HIGUCHIe6a7aa72013-10-14 15:54:03 -0700745 true);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800746 }
747 }
748 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
749 state.hasDescription = true;
750 checkSwitchReady();
751 }
752 catch (InterruptedException ex) {
753 // Ignore
754 }
755 catch (TimeoutException ex) {
756 // Ignore
757 } catch (Exception ex) {
758 log.error("Exception in reading description " +
759 " during handshake", ex);
760 }
761 }
762
763 /**
764 * Send initial switch setup information that we need before adding
765 * the switch
766 * @throws IOException
767 */
768 void sendHelloConfiguration() throws IOException {
769 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700770 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800771 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
772 }
773
774 /**
775 * Send the configuration requests we can only do after we have
776 * the features reply
777 * @throws IOException
778 */
779 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700780 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800781 // Ensure we receive the full packet via PacketIn
782 OFSetConfig config = (OFSetConfig) factory
783 .getMessage(OFType.SET_CONFIG);
784 config.setMissSendLength((short) 0xffff)
785 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
786 sw.write(config, null);
787 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
788 null);
789
790 // Get Description to set switch-specific flags
791 OFStatisticsRequest req = new OFStatisticsRequest();
792 req.setStatisticType(OFStatisticsType.DESC);
793 req.setLengthU(req.getLengthU());
794 Future<List<OFStatistics>> dfuture =
795 sw.getStatistics(req);
796 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
797 dfuture);
798
799 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700800
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700801 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800802 protected void checkSwitchReady() {
803 if (state.hsState == HandshakeState.FEATURES_REPLY &&
804 state.hasDescription && state.hasGetConfigReply) {
805
806 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700807 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800808
809 synchronized(roleChanger) {
810 // We need to keep track of all of the switches that are connected
811 // to the controller, in any role, so that we can later send the
812 // role request messages when the controller role changes.
813 // We need to be synchronized while doing this: we must not
814 // send a another role request to the connectedSwitches until
815 // we were able to add this new switch to connectedSwitches
816 // *and* send the current role to the new switch.
817 connectedSwitches.add(sw);
818
819 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800820 //Put the switch in SLAVE mode until we know we have control
821 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
822 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
823 swList.add(sw);
824 roleChanger.submitRequest(swList, Role.SLAVE);
825
Jonathan Hartcc957a02013-02-26 10:39:04 -0800826 //Request control of the switch from the global registry
827 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700828 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800829 registryService.requestControl(sw.getId(),
830 new RoleChangeCallback());
831 } catch (RegistryException e) {
832 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700833 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800834 }
835
Jonathan Hart97801ac2013-02-26 14:29:16 -0800836
Jonathan Hartcc957a02013-02-26 10:39:04 -0800837
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800838 // Send a role request if role support is enabled for the controller
839 // This is a probe that we'll use to determine if the switch
840 // actually supports the role request message. If it does we'll
841 // get back a role reply message. If it doesn't we'll get back an
842 // OFError message.
843 // If role is MASTER we will promote switch to active
844 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800845 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800846 log.debug("This controller's role is {}, " +
847 "sending initial role request msg to {}",
848 role, sw);
849 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
850 swList.add(sw);
851 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800852 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800853 }
854 else {
855 // Role supported not enabled on controller (for now)
856 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800857 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800858 "not sending role request msg to {}",
859 role, sw);
860 // Need to clear FlowMods before we add the switch
861 // and dispatch updates otherwise we have a race condition.
862 sw.clearAllFlowMods();
863 addSwitch(sw);
864 state.firstRoleReplyReceived = true;
865 }
866 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700867 if (!controlRequested) {
868 // yield to allow other thread(s) to release control
869 try {
870 Thread.sleep(10);
871 } catch (InterruptedException e) {
872 // Ignore interruptions
873 }
874 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700875 log.debug("Closing {} because we weren't able to request control " +
876 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700877 sw.channel.close();
878 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800879 }
880 }
881
882 /* Handle a role reply message we received from the switch. Since
883 * netty serializes message dispatch we don't need to synchronize
884 * against other receive operations from the same switch, so no need
885 * to synchronize addSwitch(), removeSwitch() operations from the same
886 * connection.
887 * FIXME: However, when a switch with the same DPID connects we do
888 * need some synchronization. However, handling switches with same
889 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
890 * removedSwitch notification):1
891 *
892 */
893 @LogMessageDoc(level="ERROR",
894 message="Invalid role value in role reply message",
895 explanation="Was unable to set the HA role (master or slave) " +
896 "for the controller.",
897 recommendation=LogMessageDoc.CHECK_CONTROLLER)
898 protected void handleRoleReplyMessage(OFVendor vendorMessage,
899 OFRoleReplyVendorData roleReplyVendorData) {
900 // Map from the role code in the message to our role enum
901 int nxRole = roleReplyVendorData.getRole();
902 Role role = null;
903 switch (nxRole) {
904 case OFRoleVendorData.NX_ROLE_OTHER:
905 role = Role.EQUAL;
906 break;
907 case OFRoleVendorData.NX_ROLE_MASTER:
908 role = Role.MASTER;
909 break;
910 case OFRoleVendorData.NX_ROLE_SLAVE:
911 role = Role.SLAVE;
912 break;
913 default:
914 log.error("Invalid role value in role reply message");
915 sw.getChannel().close();
916 return;
917 }
918
919 log.debug("Handling role reply for role {} from {}. " +
920 "Controller's role is {} ",
921 new Object[] { role, sw, Controller.this.role}
922 );
923
924 sw.deliverRoleReply(vendorMessage.getXid(), role);
925
926 boolean isActive = activeSwitches.containsKey(sw.getId());
927 if (!isActive && sw.isActive()) {
928 // Transition from SLAVE to MASTER.
929
930 if (!state.firstRoleReplyReceived ||
931 getAlwaysClearFlowsOnSwAdd()) {
932 // This is the first role-reply message we receive from
933 // this switch or roles were disabled when the switch
934 // connected:
935 // Delete all pre-existing flows for new connections to
936 // the master
937 //
938 // FIXME: Need to think more about what the test should
939 // be for when we flush the flow-table? For example,
940 // if all the controllers are temporarily in the backup
941 // role (e.g. right after a failure of the master
942 // controller) at the point the switch connects, then
943 // all of the controllers will initially connect as
944 // backup controllers and not flush the flow-table.
945 // Then when one of them is promoted to master following
946 // the master controller election the flow-table
947 // will still not be flushed because that's treated as
948 // a failover event where we don't want to flush the
949 // flow-table. The end result would be that the flow
950 // table for a newly connected switch is never
951 // flushed. Not sure how to handle that case though...
952 sw.clearAllFlowMods();
953 log.debug("First role reply from master switch {}, " +
954 "clear FlowTable to active switch list",
955 HexString.toHexString(sw.getId()));
956 }
957
958 // Some switches don't seem to update us with port
959 // status messages while in slave role.
960 readSwitchPortStateFromStorage(sw);
961
962 // Only add the switch to the active switch list if
963 // we're not in the slave role. Note that if the role
964 // attribute is null, then that means that the switch
965 // doesn't support the role request messages, so in that
966 // case we're effectively in the EQUAL role and the
967 // switch should be included in the active switch list.
968 addSwitch(sw);
969 log.debug("Added master switch {} to active switch list",
970 HexString.toHexString(sw.getId()));
971
972 }
973 else if (isActive && !sw.isActive()) {
974 // Transition from MASTER to SLAVE: remove switch
975 // from active switch list.
976 log.debug("Removed slave switch {} from active switch" +
977 " list", HexString.toHexString(sw.getId()));
978 removeSwitch(sw);
979 }
980
981 // Indicate that we have received a role reply message.
982 state.firstRoleReplyReceived = true;
983 }
984
985 protected boolean handleVendorMessage(OFVendor vendorMessage) {
986 boolean shouldHandleMessage = false;
987 int vendor = vendorMessage.getVendor();
988 switch (vendor) {
989 case OFNiciraVendorData.NX_VENDOR_ID:
990 OFNiciraVendorData niciraVendorData =
991 (OFNiciraVendorData)vendorMessage.getVendorData();
992 int dataType = niciraVendorData.getDataType();
993 switch (dataType) {
994 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
995 OFRoleReplyVendorData roleReplyVendorData =
996 (OFRoleReplyVendorData) niciraVendorData;
997 handleRoleReplyMessage(vendorMessage,
998 roleReplyVendorData);
999 break;
1000 default:
1001 log.warn("Unhandled Nicira VENDOR message; " +
1002 "data type = {}", dataType);
1003 break;
1004 }
1005 break;
1006 default:
1007 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1008 break;
1009 }
1010
1011 return shouldHandleMessage;
1012 }
1013
1014 /**
1015 * Dispatch an Openflow message from a switch to the appropriate
1016 * handler.
1017 * @param m The message to process
1018 * @throws IOException
1019 * @throws SwitchStateException
1020 */
1021 @LogMessageDocs({
1022 @LogMessageDoc(level="WARN",
1023 message="Config Reply from {switch} has " +
1024 "miss length set to {length}",
1025 explanation="The controller requires that the switch " +
1026 "use a miss length of 0xffff for correct " +
1027 "function",
1028 recommendation="Use a different switch to ensure " +
1029 "correct function"),
1030 @LogMessageDoc(level="WARN",
1031 message="Received ERROR from sw {switch} that "
1032 +"indicates roles are not supported "
1033 +"but we have received a valid "
1034 +"role reply earlier",
1035 explanation="The switch sent a confusing message to the" +
1036 "controller")
1037 })
1038 protected void processOFMessage(OFMessage m)
1039 throws IOException, SwitchStateException {
1040 boolean shouldHandleMessage = false;
1041
1042 switch (m.getType()) {
1043 case HELLO:
1044 if (log.isTraceEnabled())
1045 log.trace("HELLO from {}", sw);
1046
1047 if (state.hsState.equals(HandshakeState.START)) {
1048 state.hsState = HandshakeState.HELLO;
1049 sendHelloConfiguration();
1050 } else {
1051 throw new SwitchStateException("Unexpected HELLO from "
1052 + sw);
1053 }
1054 break;
1055 case ECHO_REQUEST:
1056 OFEchoReply reply =
1057 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1058 reply.setXid(m.getXid());
1059 sw.write(reply, null);
1060 break;
1061 case ECHO_REPLY:
1062 break;
1063 case FEATURES_REPLY:
1064 if (log.isTraceEnabled())
1065 log.trace("Features Reply from {}", sw);
1066
1067 sw.setFeaturesReply((OFFeaturesReply) m);
1068 if (state.hsState.equals(HandshakeState.HELLO)) {
1069 sendFeatureReplyConfiguration();
1070 state.hsState = HandshakeState.FEATURES_REPLY;
1071 // uncomment to enable "dumb" switches like cbench
1072 // state.hsState = HandshakeState.READY;
1073 // addSwitch(sw);
1074 } else {
1075 // return results to rest api caller
1076 sw.deliverOFFeaturesReply(m);
1077 // update database */
1078 updateActiveSwitchInfo(sw);
1079 }
1080 break;
1081 case GET_CONFIG_REPLY:
1082 if (log.isTraceEnabled())
1083 log.trace("Get config reply from {}", sw);
1084
1085 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1086 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1087 throw new SwitchStateException(em);
1088 }
1089 OFGetConfigReply cr = (OFGetConfigReply) m;
1090 if (cr.getMissSendLength() == (short)0xffff) {
1091 log.trace("Config Reply from {} confirms " +
1092 "miss length set to 0xffff", sw);
1093 } else {
1094 log.warn("Config Reply from {} has " +
1095 "miss length set to {}",
1096 sw, cr.getMissSendLength() & 0xffff);
1097 }
1098 state.hasGetConfigReply = true;
1099 checkSwitchReady();
1100 break;
1101 case VENDOR:
1102 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1103 break;
1104 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001105 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001106 // TODO: we need better error handling. Especially for
1107 // request/reply style message (stats, roles) we should have
1108 // a unified way to lookup the xid in the error message.
1109 // This will probable involve rewriting the way we handle
1110 // request/reply style messages.
1111 OFError error = (OFError) m;
1112 boolean shouldLogError = true;
1113 // TODO: should we check that firstRoleReplyReceived is false,
1114 // i.e., check only whether the first request fails?
1115 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1116 boolean isBadVendorError =
1117 (error.getErrorType() == OFError.OFErrorType.
1118 OFPET_BAD_REQUEST.getValue());
1119 // We expect to receive a bad vendor error when
1120 // we're connected to a switch that doesn't support
1121 // the Nicira vendor extensions (i.e. not OVS or
1122 // derived from OVS). By protocol, it should also be
1123 // BAD_VENDOR, but too many switch implementations
1124 // get it wrong and we can already check the xid()
1125 // so we can ignore the type with confidence that this
1126 // is not a spurious error
1127 shouldLogError = !isBadVendorError;
1128 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001129 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001130 if (state.firstRoleReplyReceived && (role != null)) {
1131 log.warn("Received ERROR from sw {} that "
1132 +"indicates roles are not supported "
1133 +"but we have received a valid "
1134 +"role reply earlier", sw);
1135 }
1136 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001137 Role requestedRole =
HIGUCHI Yutaeae374d2013-06-17 10:39:42 -07001138 sw.deliverRoleRequestNotSupportedEx(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001139 synchronized(roleChanger) {
1140 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001141 //This will now never happen. The Controller's role
1142 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001143 // the switch doesn't understand role request
1144 // messages and the current controller role is
1145 // slave. We need to disconnect the switch.
1146 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001147 log.warn("Closing {} channel because controller's role " +
1148 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001149 sw.getChannel().close();
1150 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001151 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001152 log.debug("Adding switch {} because we got an error" +
1153 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001154 // Controller's role is master: add to
1155 // active
1156 // TODO: check if clearing flow table is
1157 // right choice here.
1158 // Need to clear FlowMods before we add the switch
1159 // and dispatch updates otherwise we have a race condition.
1160 // TODO: switch update is async. Won't we still have a potential
1161 // race condition?
1162 sw.clearAllFlowMods();
1163 addSwitch(sw);
1164 }
1165 }
1166 }
1167 else {
1168 // TODO: Is this the right thing to do if we receive
1169 // some other error besides a bad vendor error?
1170 // Presumably that means the switch did actually
1171 // understand the role request message, but there
1172 // was some other error from processing the message.
1173 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1174 // error code, but it doesn't look like the Nicira
1175 // role request has that. Should check OVS source
1176 // code to see if it's possible for any other errors
1177 // to be returned.
1178 // If we received an error the switch is not
1179 // in the correct role, so we need to disconnect it.
1180 // We could also resend the request but then we need to
1181 // check if there are other pending request in which
1182 // case we shouldn't resend. If we do resend we need
1183 // to make sure that the switch eventually accepts one
1184 // of our requests or disconnect the switch. This feels
1185 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001186 log.debug("Closing {} channel because we recieved an " +
1187 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001188 sw.getChannel().close();
1189 }
1190 }
1191 // Once we support OF 1.2, we'd add code to handle it here.
1192 //if (error.getXid() == state.ofRoleRequestXid) {
1193 //}
1194 if (shouldLogError)
1195 logError(sw, error);
1196 break;
1197 case STATS_REPLY:
1198 if (state.hsState.ordinal() <
1199 HandshakeState.FEATURES_REPLY.ordinal()) {
1200 String em = "Unexpected STATS_REPLY from " + sw;
1201 throw new SwitchStateException(em);
1202 }
1203 sw.deliverStatisticsReply(m);
1204 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1205 processSwitchDescReply();
1206 }
1207 break;
1208 case PORT_STATUS:
1209 // We want to update our port state info even if we're in
1210 // the slave role, but we only want to update storage if
1211 // we're the master (or equal).
1212 boolean updateStorage = state.hsState.
1213 equals(HandshakeState.READY) &&
1214 (sw.getRole() != Role.SLAVE);
1215 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1216 shouldHandleMessage = true;
1217 break;
1218
1219 default:
1220 shouldHandleMessage = true;
1221 break;
1222 }
1223
1224 if (shouldHandleMessage) {
1225 sw.getListenerReadLock().lock();
1226 try {
1227 if (sw.isConnected()) {
1228 if (!state.hsState.equals(HandshakeState.READY)) {
1229 log.debug("Ignoring message type {} received " +
1230 "from switch {} before switch is " +
1231 "fully configured.", m.getType(), sw);
1232 }
1233 // Check if the controller is in the slave role for the
1234 // switch. If it is, then don't dispatch the message to
1235 // the listeners.
1236 // TODO: Should we dispatch messages that we expect to
1237 // receive when we're in the slave role, e.g. port
1238 // status messages? Since we're "hiding" switches from
1239 // the listeners when we're in the slave role, then it
1240 // seems a little weird to dispatch port status messages
1241 // to them. On the other hand there might be special
1242 // modules that care about all of the connected switches
1243 // and would like to receive port status notifications.
1244 else if (sw.getRole() == Role.SLAVE) {
1245 // Don't log message if it's a port status message
1246 // since we expect to receive those from the switch
1247 // and don't want to emit spurious messages.
1248 if (m.getType() != OFType.PORT_STATUS) {
1249 log.debug("Ignoring message type {} received " +
1250 "from switch {} while in the slave role.",
1251 m.getType(), sw);
1252 }
1253 } else {
1254 handleMessage(sw, m, null);
1255 }
1256 }
1257 }
1258 finally {
1259 sw.getListenerReadLock().unlock();
1260 }
1261 }
1262 }
1263 }
1264
1265 // ****************
1266 // Message handlers
1267 // ****************
1268
1269 protected void handlePortStatusMessage(IOFSwitch sw,
1270 OFPortStatus m,
1271 boolean updateStorage) {
1272 short portNumber = m.getDesc().getPortNumber();
1273 OFPhysicalPort port = m.getDesc();
1274 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001275 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1276 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001277 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001278 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001279 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1280 try {
1281 this.updates.put(update);
1282 } catch (InterruptedException e) {
1283 log.error("Failure adding update to queue", e);
1284 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001285 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001286 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1287 try {
1288 this.updates.put(update);
1289 } catch (InterruptedException e) {
1290 log.error("Failure adding update to queue", e);
1291 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001292 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001293 if (updateStorage)
1294 updatePortInfo(sw, port);
1295 log.debug("Port #{} modified for {}", portNumber, sw);
1296 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1297 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001298 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1299 try {
1300 this.updates.put(update);
1301 } catch (InterruptedException e) {
1302 log.error("Failure adding update to queue", e);
1303 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001304 if (updateStorage)
1305 updatePortInfo(sw, port);
1306 log.debug("Port #{} added for {}", portNumber, sw);
1307 } else if (m.getReason() ==
1308 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1309 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001310 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1311 try {
1312 this.updates.put(update);
1313 } catch (InterruptedException e) {
1314 log.error("Failure adding update to queue", e);
1315 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001316 if (updateStorage)
1317 removePortInfo(sw, portNumber);
1318 log.debug("Port #{} deleted for {}", portNumber, sw);
1319 }
1320 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1321 try {
1322 this.updates.put(update);
1323 } catch (InterruptedException e) {
1324 log.error("Failure adding update to queue", e);
1325 }
1326 }
1327
1328 /**
1329 * flcontext_cache - Keep a thread local stack of contexts
1330 */
1331 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1332 new ThreadLocal <Stack<FloodlightContext>> () {
1333 @Override
1334 protected Stack<FloodlightContext> initialValue() {
1335 return new Stack<FloodlightContext>();
1336 }
1337 };
1338
1339 /**
1340 * flcontext_alloc - pop a context off the stack, if required create a new one
1341 * @return FloodlightContext
1342 */
1343 protected static FloodlightContext flcontext_alloc() {
1344 FloodlightContext flcontext = null;
1345
1346 if (flcontext_cache.get().empty()) {
1347 flcontext = new FloodlightContext();
1348 }
1349 else {
1350 flcontext = flcontext_cache.get().pop();
1351 }
1352
1353 return flcontext;
1354 }
1355
1356 /**
1357 * flcontext_free - Free the context to the current thread
1358 * @param flcontext
1359 */
1360 protected void flcontext_free(FloodlightContext flcontext) {
1361 flcontext.getStorage().clear();
1362 flcontext_cache.get().push(flcontext);
1363 }
1364
1365 /**
1366 * Handle replies to certain OFMessages, and pass others off to listeners
1367 * @param sw The switch for the message
1368 * @param m The message
1369 * @param bContext The floodlight context. If null then floodlight context would
1370 * be allocated in this function
1371 * @throws IOException
1372 */
1373 @LogMessageDocs({
1374 @LogMessageDoc(level="ERROR",
1375 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1376 " field is empty.",
1377 explanation="The switch sent an improperly-formatted PacketIn" +
1378 " message",
1379 recommendation=LogMessageDoc.CHECK_SWITCH),
1380 @LogMessageDoc(level="WARN",
1381 message="Unhandled OF Message: {} from {}",
1382 explanation="The switch sent a message not handled by " +
1383 "the controller")
1384 })
1385 protected void handleMessage(IOFSwitch sw, OFMessage m,
1386 FloodlightContext bContext)
1387 throws IOException {
1388 Ethernet eth = null;
1389
1390 switch (m.getType()) {
1391 case PACKET_IN:
1392 OFPacketIn pi = (OFPacketIn)m;
1393
1394 if (pi.getPacketData().length <= 0) {
1395 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1396 ") because the data field is empty.");
1397 return;
1398 }
1399
1400 if (Controller.ALWAYS_DECODE_ETH) {
1401 eth = new Ethernet();
1402 eth.deserialize(pi.getPacketData(), 0,
1403 pi.getPacketData().length);
1404 counterStore.updatePacketInCounters(sw, m, eth);
1405 }
1406 // fall through to default case...
1407
1408 default:
1409
1410 List<IOFMessageListener> listeners = null;
1411 if (messageListeners.containsKey(m.getType())) {
1412 listeners = messageListeners.get(m.getType()).
1413 getOrderedListeners();
1414 }
1415
1416 FloodlightContext bc = null;
1417 if (listeners != null) {
1418 // Check if floodlight context is passed from the calling
1419 // function, if so use that floodlight context, otherwise
1420 // allocate one
1421 if (bContext == null) {
1422 bc = flcontext_alloc();
1423 } else {
1424 bc = bContext;
1425 }
1426 if (eth != null) {
1427 IFloodlightProviderService.bcStore.put(bc,
1428 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1429 eth);
1430 }
1431
1432 // Get the starting time (overall and per-component) of
1433 // the processing chain for this packet if performance
1434 // monitoring is turned on
mininet73e7fb72013-12-03 14:25:53 -08001435
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001436 Command cmd;
1437 for (IOFMessageListener listener : listeners) {
1438 if (listener instanceof IOFSwitchFilter) {
1439 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1440 continue;
1441 }
1442 }
1443
mininet73e7fb72013-12-03 14:25:53 -08001444
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001445 cmd = listener.receive(sw, m, bc);
mininet73e7fb72013-12-03 14:25:53 -08001446
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001447
1448 if (Command.STOP.equals(cmd)) {
1449 break;
1450 }
1451 }
mininet73e7fb72013-12-03 14:25:53 -08001452
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001453 } else {
1454 log.warn("Unhandled OF Message: {} from {}", m, sw);
1455 }
1456
1457 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1458 }
1459 }
1460
1461 /**
1462 * Log an OpenFlow error message from a switch
1463 * @param sw The switch that sent the error
1464 * @param error The error message
1465 */
1466 @LogMessageDoc(level="ERROR",
1467 message="Error {error type} {error code} from {switch}",
1468 explanation="The switch responded with an unexpected error" +
1469 "to an OpenFlow message from the controller",
1470 recommendation="This could indicate improper network operation. " +
1471 "If the problem persists restarting the switch and " +
1472 "controller may help."
1473 )
1474 protected void logError(IOFSwitch sw, OFError error) {
1475 int etint = 0xffff & error.getErrorType();
1476 if (etint < 0 || etint >= OFErrorType.values().length) {
1477 log.error("Unknown error code {} from sw {}", etint, sw);
1478 }
1479 OFErrorType et = OFErrorType.values()[etint];
1480 switch (et) {
1481 case OFPET_HELLO_FAILED:
1482 OFHelloFailedCode hfc =
1483 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1484 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1485 break;
1486 case OFPET_BAD_REQUEST:
1487 OFBadRequestCode brc =
1488 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1489 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1490 break;
1491 case OFPET_BAD_ACTION:
1492 OFBadActionCode bac =
1493 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1494 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1495 break;
1496 case OFPET_FLOW_MOD_FAILED:
1497 OFFlowModFailedCode fmfc =
1498 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1499 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1500 break;
1501 case OFPET_PORT_MOD_FAILED:
1502 OFPortModFailedCode pmfc =
1503 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1504 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1505 break;
1506 case OFPET_QUEUE_OP_FAILED:
1507 OFQueueOpFailedCode qofc =
1508 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1509 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1510 break;
1511 default:
1512 break;
1513 }
1514 }
1515
1516 /**
1517 * Add a switch to the active switch list and call the switch listeners.
1518 * This happens either when a switch first connects (and the controller is
1519 * not in the slave role) or when the role of the controller changes from
1520 * slave to master.
1521 * @param sw the switch that has been added
1522 */
1523 // TODO: need to rethink locking and the synchronous switch update.
1524 // We can / should also handle duplicate DPIDs in connectedSwitches
1525 @LogMessageDoc(level="ERROR",
1526 message="New switch added {switch} for already-added switch {switch}",
1527 explanation="A switch with the same DPID as another switch " +
1528 "connected to the controller. This can be caused by " +
1529 "multiple switches configured with the same DPID, or " +
1530 "by a switch reconnected very quickly after " +
1531 "disconnecting.",
1532 recommendation="If this happens repeatedly, it is likely there " +
1533 "are switches with duplicate DPIDs on the network. " +
1534 "Reconfigure the appropriate switches. If it happens " +
1535 "very rarely, then it is likely this is a transient " +
1536 "network problem that can be ignored."
1537 )
1538 protected void addSwitch(IOFSwitch sw) {
1539 // TODO: is it safe to modify the HashMap without holding
1540 // the old switch's lock?
1541 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1542 if (sw == oldSw) {
1543 // Note == for object equality, not .equals for value
1544 log.info("New add switch for pre-existing switch {}", sw);
1545 return;
1546 }
1547
1548 if (oldSw != null) {
1549 oldSw.getListenerWriteLock().lock();
1550 try {
1551 log.error("New switch added {} for already-added switch {}",
1552 sw, oldSw);
1553 // Set the connected flag to false to suppress calling
1554 // the listeners for this switch in processOFMessage
1555 oldSw.setConnected(false);
1556
1557 oldSw.cancelAllStatisticsReplies();
1558
1559 updateInactiveSwitchInfo(oldSw);
1560
1561 // we need to clean out old switch state definitively
1562 // before adding the new switch
1563 // FIXME: It seems not completely kosher to call the
1564 // switch listeners here. I thought one of the points of
1565 // having the asynchronous switch update mechanism was so
1566 // the addedSwitch and removedSwitch were always called
1567 // from a single thread to simplify concurrency issues
1568 // for the listener.
1569 if (switchListeners != null) {
1570 for (IOFSwitchListener listener : switchListeners) {
1571 listener.removedSwitch(oldSw);
1572 }
1573 }
1574 // will eventually trigger a removeSwitch(), which will cause
1575 // a "Not removing Switch ... already removed debug message.
1576 // TODO: Figure out a way to handle this that avoids the
1577 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001578 log.debug("Closing {} because a new IOFSwitch got added " +
1579 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001580 oldSw.getChannel().close();
1581 }
1582 finally {
1583 oldSw.getListenerWriteLock().unlock();
1584 }
1585 }
1586
1587 updateActiveSwitchInfo(sw);
1588 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1589 try {
1590 this.updates.put(update);
1591 } catch (InterruptedException e) {
1592 log.error("Failure adding update to queue", e);
1593 }
1594 }
1595
1596 /**
1597 * Remove a switch from the active switch list and call the switch listeners.
1598 * This happens either when the switch is disconnected or when the
1599 * controller's role for the switch changes from master to slave.
1600 * @param sw the switch that has been removed
1601 */
1602 protected void removeSwitch(IOFSwitch sw) {
1603 // No need to acquire the listener lock, since
1604 // this method is only called after netty has processed all
1605 // pending messages
1606 log.debug("removeSwitch: {}", sw);
1607 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1608 log.debug("Not removing switch {}; already removed", sw);
1609 return;
1610 }
1611 // We cancel all outstanding statistics replies if the switch transition
1612 // from active. In the future we might allow statistics requests
1613 // from slave controllers. Then we need to move this cancelation
1614 // to switch disconnect
1615 sw.cancelAllStatisticsReplies();
1616
1617 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1618 // here if role support is enabled. In that case if the switch is being
1619 // removed because we've been switched to being in the slave role, then I think
1620 // it's possible that the new master may have already been promoted to master
1621 // and written out the active switch state to storage. If we now execute
1622 // updateInactiveSwitchInfo we may wipe out all of the state that was
1623 // written out by the new master. Maybe need to revisit how we handle all
1624 // of the switch state that's written to storage.
1625
1626 updateInactiveSwitchInfo(sw);
1627 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1628 try {
1629 this.updates.put(update);
1630 } catch (InterruptedException e) {
1631 log.error("Failure adding update to queue", e);
1632 }
1633 }
1634
1635 // ***************
1636 // IFloodlightProvider
1637 // ***************
1638
1639 @Override
1640 public synchronized void addOFMessageListener(OFType type,
1641 IOFMessageListener listener) {
1642 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1643 messageListeners.get(type);
1644 if (ldd == null) {
1645 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1646 messageListeners.put(type, ldd);
1647 }
1648 ldd.addListener(type, listener);
1649 }
1650
1651 @Override
1652 public synchronized void removeOFMessageListener(OFType type,
1653 IOFMessageListener listener) {
1654 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1655 messageListeners.get(type);
1656 if (ldd != null) {
1657 ldd.removeListener(listener);
1658 }
1659 }
1660
1661 private void logListeners() {
1662 for (Map.Entry<OFType,
1663 ListenerDispatcher<OFType,
1664 IOFMessageListener>> entry
1665 : messageListeners.entrySet()) {
1666
1667 OFType type = entry.getKey();
1668 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1669 entry.getValue();
1670
1671 StringBuffer sb = new StringBuffer();
1672 sb.append("OFListeners for ");
1673 sb.append(type);
1674 sb.append(": ");
1675 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1676 sb.append(l.getName());
1677 sb.append(",");
1678 }
1679 log.debug(sb.toString());
1680 }
1681 }
1682
1683 public void removeOFMessageListeners(OFType type) {
1684 messageListeners.remove(type);
1685 }
1686
1687 @Override
1688 public Map<Long, IOFSwitch> getSwitches() {
1689 return Collections.unmodifiableMap(this.activeSwitches);
1690 }
1691
1692 @Override
1693 public void addOFSwitchListener(IOFSwitchListener listener) {
1694 this.switchListeners.add(listener);
1695 }
1696
1697 @Override
1698 public void removeOFSwitchListener(IOFSwitchListener listener) {
1699 this.switchListeners.remove(listener);
1700 }
1701
1702 @Override
1703 public Map<OFType, List<IOFMessageListener>> getListeners() {
1704 Map<OFType, List<IOFMessageListener>> lers =
1705 new HashMap<OFType, List<IOFMessageListener>>();
1706 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1707 messageListeners.entrySet()) {
1708 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1709 }
1710 return Collections.unmodifiableMap(lers);
1711 }
1712
1713 @Override
1714 @LogMessageDocs({
1715 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1716 "a null switch",
1717 explanation="Failed to process a message because the switch " +
1718 " is no longer connected."),
1719 @LogMessageDoc(level="ERROR",
1720 message="Error reinjecting OFMessage on switch {switch}",
1721 explanation="An I/O error occured while attempting to " +
1722 "process an OpenFlow message",
1723 recommendation=LogMessageDoc.CHECK_SWITCH)
1724 })
1725 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1726 FloodlightContext bc) {
1727 if (sw == null) {
1728 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1729 return false;
1730 }
1731
1732 // FIXME: Do we need to be able to inject messages to switches
1733 // where we're the slave controller (i.e. they're connected but
1734 // not active)?
1735 // FIXME: Don't we need synchronization logic here so we're holding
1736 // the listener read lock when we call handleMessage? After some
1737 // discussions it sounds like the right thing to do here would be to
1738 // inject the message as a netty upstream channel event so it goes
1739 // through the normal netty event processing, including being
1740 // handled
1741 if (!activeSwitches.containsKey(sw.getId())) return false;
1742
1743 try {
1744 // Pass Floodlight context to the handleMessages()
1745 handleMessage(sw, msg, bc);
1746 } catch (IOException e) {
1747 log.error("Error reinjecting OFMessage on switch {}",
1748 HexString.toHexString(sw.getId()));
1749 return false;
1750 }
1751 return true;
1752 }
1753
1754 @Override
1755 @LogMessageDoc(message="Calling System.exit",
1756 explanation="The controller is terminating")
1757 public synchronized void terminate() {
1758 log.info("Calling System.exit");
1759 System.exit(1);
1760 }
1761
1762 @Override
1763 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1764 // call the overloaded version with floodlight context set to null
1765 return injectOfMessage(sw, msg, null);
1766 }
1767
1768 @Override
1769 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1770 FloodlightContext bc) {
1771 if (log.isTraceEnabled()) {
1772 String str = OFMessage.getDataAsString(sw, m, bc);
1773 log.trace("{}", str);
1774 }
1775
1776 List<IOFMessageListener> listeners = null;
1777 if (messageListeners.containsKey(m.getType())) {
1778 listeners =
1779 messageListeners.get(m.getType()).getOrderedListeners();
1780 }
1781
1782 if (listeners != null) {
1783 for (IOFMessageListener listener : listeners) {
1784 if (listener instanceof IOFSwitchFilter) {
1785 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1786 continue;
1787 }
1788 }
1789 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1790 break;
1791 }
1792 }
1793 }
1794 }
1795
1796 @Override
1797 public BasicFactory getOFMessageFactory() {
1798 return factory;
1799 }
1800
1801 @Override
1802 public String getControllerId() {
1803 return controllerId;
1804 }
1805
1806 // **************
1807 // Initialization
1808 // **************
1809
1810 protected void updateAllInactiveSwitchInfo() {
1811 if (role == Role.SLAVE) {
1812 return;
1813 }
1814 String controllerId = getControllerId();
1815 String[] switchColumns = { SWITCH_DATAPATH_ID,
1816 SWITCH_CONTROLLER_ID,
1817 SWITCH_ACTIVE };
1818 String[] portColumns = { PORT_ID, PORT_SWITCH };
1819 IResultSet switchResultSet = null;
1820 try {
1821 OperatorPredicate op =
1822 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1823 OperatorPredicate.Operator.EQ,
1824 controllerId);
1825 switchResultSet =
1826 storageSource.executeQuery(SWITCH_TABLE_NAME,
1827 switchColumns,
1828 op, null);
1829 while (switchResultSet.next()) {
1830 IResultSet portResultSet = null;
1831 try {
1832 String datapathId =
1833 switchResultSet.getString(SWITCH_DATAPATH_ID);
1834 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1835 op = new OperatorPredicate(PORT_SWITCH,
1836 OperatorPredicate.Operator.EQ,
1837 datapathId);
1838 portResultSet =
1839 storageSource.executeQuery(PORT_TABLE_NAME,
1840 portColumns,
1841 op, null);
1842 while (portResultSet.next()) {
1843 portResultSet.deleteRow();
1844 }
1845 portResultSet.save();
1846 }
1847 finally {
1848 if (portResultSet != null)
1849 portResultSet.close();
1850 }
1851 }
1852 switchResultSet.save();
1853 }
1854 finally {
1855 if (switchResultSet != null)
1856 switchResultSet.close();
1857 }
1858 }
1859
1860 protected void updateControllerInfo() {
1861 updateAllInactiveSwitchInfo();
1862
1863 // Write out the controller info to the storage source
1864 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1865 String id = getControllerId();
1866 controllerInfo.put(CONTROLLER_ID, id);
1867 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1868 }
1869
1870 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1871 if (role == Role.SLAVE) {
1872 return;
1873 }
1874 // Obtain the row info for the switch
1875 Map<String, Object> switchInfo = new HashMap<String, Object>();
1876 String datapathIdString = sw.getStringId();
1877 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1878 String controllerId = getControllerId();
1879 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1880 Date connectedSince = sw.getConnectedSince();
1881 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1882 Channel channel = sw.getChannel();
1883 SocketAddress socketAddress = channel.getRemoteAddress();
1884 if (socketAddress != null) {
1885 String socketAddressString = socketAddress.toString();
1886 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1887 if (socketAddress instanceof InetSocketAddress) {
1888 InetSocketAddress inetSocketAddress =
1889 (InetSocketAddress)socketAddress;
1890 InetAddress inetAddress = inetSocketAddress.getAddress();
1891 String ip = inetAddress.getHostAddress();
1892 switchInfo.put(SWITCH_IP, ip);
1893 }
1894 }
1895
1896 // Write out the switch features info
1897 long capabilities = U32.f(sw.getCapabilities());
1898 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1899 long buffers = U32.f(sw.getBuffers());
1900 switchInfo.put(SWITCH_BUFFERS, buffers);
1901 long tables = U32.f(sw.getTables());
1902 switchInfo.put(SWITCH_TABLES, tables);
1903 long actions = U32.f(sw.getActions());
1904 switchInfo.put(SWITCH_ACTIONS, actions);
1905 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1906
1907 // Update the switch
1908 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1909
1910 // Update the ports
1911 for (OFPhysicalPort port: sw.getPorts()) {
1912 updatePortInfo(sw, port);
1913 }
1914 }
1915
1916 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1917 if (role == Role.SLAVE) {
1918 return;
1919 }
1920 log.debug("Update DB with inactiveSW {}", sw);
1921 // Update the controller info in the storage source to be inactive
1922 Map<String, Object> switchInfo = new HashMap<String, Object>();
1923 String datapathIdString = sw.getStringId();
1924 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1925 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1926 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1927 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1928 }
1929
1930 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1931 if (role == Role.SLAVE) {
1932 return;
1933 }
1934 String datapathIdString = sw.getStringId();
1935 Map<String, Object> portInfo = new HashMap<String, Object>();
1936 int portNumber = U16.f(port.getPortNumber());
1937 String id = datapathIdString + "|" + portNumber;
1938 portInfo.put(PORT_ID, id);
1939 portInfo.put(PORT_SWITCH, datapathIdString);
1940 portInfo.put(PORT_NUMBER, portNumber);
1941 byte[] hardwareAddress = port.getHardwareAddress();
1942 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1943 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1944 String name = port.getName();
1945 portInfo.put(PORT_NAME, name);
1946 long config = U32.f(port.getConfig());
1947 portInfo.put(PORT_CONFIG, config);
1948 long state = U32.f(port.getState());
1949 portInfo.put(PORT_STATE, state);
1950 long currentFeatures = U32.f(port.getCurrentFeatures());
1951 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1952 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1953 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1954 long supportedFeatures = U32.f(port.getSupportedFeatures());
1955 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1956 long peerFeatures = U32.f(port.getPeerFeatures());
1957 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1958 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1959 }
1960
1961 /**
1962 * Read switch port data from storage and write it into a switch object
1963 * @param sw the switch to update
1964 */
1965 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1966 OperatorPredicate op =
1967 new OperatorPredicate(PORT_SWITCH,
1968 OperatorPredicate.Operator.EQ,
1969 sw.getStringId());
1970 IResultSet portResultSet =
1971 storageSource.executeQuery(PORT_TABLE_NAME,
1972 null, op, null);
1973 //Map<Short, OFPhysicalPort> oldports =
1974 // new HashMap<Short, OFPhysicalPort>();
1975 //oldports.putAll(sw.getPorts());
1976
1977 while (portResultSet.next()) {
1978 try {
1979 OFPhysicalPort p = new OFPhysicalPort();
1980 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1981 p.setName(portResultSet.getString(PORT_NAME));
1982 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1983 p.setState((int)portResultSet.getLong(PORT_STATE));
1984 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1985 p.setHardwareAddress(HexString.fromHexString(portMac));
1986 p.setCurrentFeatures((int)portResultSet.
1987 getLong(PORT_CURRENT_FEATURES));
1988 p.setAdvertisedFeatures((int)portResultSet.
1989 getLong(PORT_ADVERTISED_FEATURES));
1990 p.setSupportedFeatures((int)portResultSet.
1991 getLong(PORT_SUPPORTED_FEATURES));
1992 p.setPeerFeatures((int)portResultSet.
1993 getLong(PORT_PEER_FEATURES));
1994 //oldports.remove(Short.valueOf(p.getPortNumber()));
1995 sw.setPort(p);
1996 } catch (NullPointerException e) {
1997 // ignore
1998 }
1999 }
2000 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2001 try {
2002 this.updates.put(update);
2003 } catch (InterruptedException e) {
2004 log.error("Failure adding update to queue", e);
2005 }
2006 }
2007
2008 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2009 if (role == Role.SLAVE) {
2010 return;
2011 }
2012 String datapathIdString = sw.getStringId();
2013 String id = datapathIdString + "|" + portNumber;
2014 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2015 }
2016
2017 /**
2018 * Sets the initial role based on properties in the config params.
2019 * It looks for two different properties.
2020 * If the "role" property is specified then the value should be
2021 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2022 * controller is set to the specified value. If the "role" property
2023 * is not specified then it looks next for the "role.path" property.
2024 * In this case the value should be the path to a property file in
2025 * the file system that contains a property called "floodlight.role"
2026 * which can be one of the values listed above for the "role" property.
2027 * The idea behind the "role.path" mechanism is that you have some
2028 * separate heartbeat and master controller election algorithm that
2029 * determines the role of the controller. When a role transition happens,
2030 * it updates the current role in the file specified by the "role.path"
2031 * file. Then if floodlight restarts for some reason it can get the
2032 * correct current role of the controller from the file.
2033 * @param configParams The config params for the FloodlightProvider service
2034 * @return A valid role if role information is specified in the
2035 * config params, otherwise null
2036 */
2037 @LogMessageDocs({
2038 @LogMessageDoc(message="Controller role set to {role}",
2039 explanation="Setting the initial HA role to "),
2040 @LogMessageDoc(level="ERROR",
2041 message="Invalid current role value: {role}",
2042 explanation="An invalid HA role value was read from the " +
2043 "properties file",
2044 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2045 })
2046 protected Role getInitialRole(Map<String, String> configParams) {
2047 Role role = null;
2048 String roleString = configParams.get("role");
2049 if (roleString == null) {
2050 String rolePath = configParams.get("rolepath");
2051 if (rolePath != null) {
2052 Properties properties = new Properties();
2053 try {
2054 properties.load(new FileInputStream(rolePath));
2055 roleString = properties.getProperty("floodlight.role");
2056 }
2057 catch (IOException exc) {
2058 // Don't treat it as an error if the file specified by the
2059 // rolepath property doesn't exist. This lets us enable the
2060 // HA mechanism by just creating/setting the floodlight.role
2061 // property in that file without having to modify the
2062 // floodlight properties.
2063 }
2064 }
2065 }
2066
2067 if (roleString != null) {
2068 // Canonicalize the string to the form used for the enum constants
2069 roleString = roleString.trim().toUpperCase();
2070 try {
2071 role = Role.valueOf(roleString);
2072 }
2073 catch (IllegalArgumentException exc) {
2074 log.error("Invalid current role value: {}", roleString);
2075 }
2076 }
2077
2078 log.info("Controller role set to {}", role);
2079
2080 return role;
2081 }
2082
2083 /**
2084 * Tell controller that we're ready to accept switches loop
2085 * @throws IOException
2086 */
2087 @LogMessageDocs({
2088 @LogMessageDoc(message="Listening for switch connections on {address}",
2089 explanation="The controller is ready and listening for new" +
2090 " switch connections"),
2091 @LogMessageDoc(message="Storage exception in controller " +
2092 "updates loop; terminating process",
2093 explanation=ERROR_DATABASE,
2094 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2095 @LogMessageDoc(level="ERROR",
2096 message="Exception in controller updates loop",
2097 explanation="Failed to dispatch controller event",
2098 recommendation=LogMessageDoc.GENERIC_ACTION)
2099 })
2100 public void run() {
2101 if (log.isDebugEnabled()) {
2102 logListeners();
2103 }
2104
2105 try {
2106 final ServerBootstrap bootstrap = createServerBootStrap();
2107
2108 bootstrap.setOption("reuseAddr", true);
2109 bootstrap.setOption("child.keepAlive", true);
2110 bootstrap.setOption("child.tcpNoDelay", true);
2111 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2112
2113 ChannelPipelineFactory pfact =
2114 new OpenflowPipelineFactory(this, null);
2115 bootstrap.setPipelineFactory(pfact);
2116 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2117 final ChannelGroup cg = new DefaultChannelGroup();
2118 cg.add(bootstrap.bind(sa));
2119
2120 log.info("Listening for switch connections on {}", sa);
2121 } catch (Exception e) {
2122 throw new RuntimeException(e);
2123 }
2124
2125 // main loop
2126 while (true) {
2127 try {
2128 IUpdate update = updates.take();
2129 update.dispatch();
2130 } catch (InterruptedException e) {
2131 return;
2132 } catch (StorageException e) {
2133 log.error("Storage exception in controller " +
2134 "updates loop; terminating process", e);
2135 return;
2136 } catch (Exception e) {
2137 log.error("Exception in controller updates loop", e);
2138 }
2139 }
2140 }
2141
2142 private ServerBootstrap createServerBootStrap() {
2143 if (workerThreads == 0) {
2144 return new ServerBootstrap(
2145 new NioServerSocketChannelFactory(
2146 Executors.newCachedThreadPool(),
2147 Executors.newCachedThreadPool()));
2148 } else {
2149 return new ServerBootstrap(
2150 new NioServerSocketChannelFactory(
2151 Executors.newCachedThreadPool(),
2152 Executors.newCachedThreadPool(), workerThreads));
2153 }
2154 }
2155
2156 public void setConfigParams(Map<String, String> configParams) {
2157 String ofPort = configParams.get("openflowport");
2158 if (ofPort != null) {
2159 this.openFlowPort = Integer.parseInt(ofPort);
2160 }
2161 log.debug("OpenFlow port set to {}", this.openFlowPort);
2162 String threads = configParams.get("workerthreads");
2163 if (threads != null) {
2164 this.workerThreads = Integer.parseInt(threads);
2165 }
2166 log.debug("Number of worker threads set to {}", this.workerThreads);
2167 String controllerId = configParams.get("controllerid");
2168 if (controllerId != null) {
2169 this.controllerId = controllerId;
2170 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002171 else {
2172 //Try to get the hostname of the machine and use that for controller ID
2173 try {
2174 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2175 this.controllerId = hostname;
2176 } catch (UnknownHostException e) {
2177 // Can't get hostname, we'll just use the default
2178 }
2179 }
2180
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002181 log.debug("ControllerId set to {}", this.controllerId);
2182 }
2183
2184 private void initVendorMessages() {
2185 // Configure openflowj to be able to parse the role request/reply
2186 // vendor messages.
2187 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2188 OFNiciraVendorData.NX_VENDOR_ID, 4);
2189 OFVendorId.registerVendorId(niciraVendorId);
2190 OFBasicVendorDataType roleRequestVendorData =
2191 new OFBasicVendorDataType(
2192 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2193 OFRoleRequestVendorData.getInstantiable());
2194 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2195 OFBasicVendorDataType roleReplyVendorData =
2196 new OFBasicVendorDataType(
2197 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2198 OFRoleReplyVendorData.getInstantiable());
2199 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2200 }
2201
2202 /**
2203 * Initialize internal data structures
2204 */
2205 public void init(Map<String, String> configParams) {
2206 // These data structures are initialized here because other
2207 // module's startUp() might be called before ours
2208 this.messageListeners =
2209 new ConcurrentHashMap<OFType,
2210 ListenerDispatcher<OFType,
2211 IOFMessageListener>>();
2212 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2213 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2214 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2215 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2216 this.controllerNodeIPsCache = new HashMap<String, String>();
2217 this.updates = new LinkedBlockingQueue<IUpdate>();
2218 this.factory = new BasicFactory();
2219 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2220 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002221 //this.role = getInitialRole(configParams);
2222 //Set the controller's role to MASTER so it always tries to do role requests.
2223 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002224 this.roleChanger = new RoleChanger();
2225 initVendorMessages();
2226 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002227 }
2228
2229 /**
2230 * Startup all of the controller's components
2231 */
2232 @LogMessageDoc(message="Waiting for storage source",
2233 explanation="The system database is not yet ready",
2234 recommendation="If this message persists, this indicates " +
2235 "that the system database has failed to start. " +
2236 LogMessageDoc.CHECK_CONTROLLER)
2237 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002238 try {
2239 registryService.registerController(controllerId);
2240 } catch (RegistryException e2) {
2241 log.warn("Registry service error: {}", e2.getMessage());
2242 }
2243
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002244 // Create the table names we use
2245 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2246 storageSource.createTable(SWITCH_TABLE_NAME, null);
2247 storageSource.createTable(PORT_TABLE_NAME, null);
2248 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2249 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2250 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2251 CONTROLLER_ID);
2252 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2253 SWITCH_DATAPATH_ID);
2254 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2255 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2256 CONTROLLER_INTERFACE_ID);
2257 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2258
2259 while (true) {
2260 try {
2261 updateControllerInfo();
2262 break;
2263 }
2264 catch (StorageException e) {
2265 log.info("Waiting for storage source");
2266 try {
2267 Thread.sleep(1000);
2268 } catch (InterruptedException e1) {
2269 }
2270 }
2271 }
2272
2273 // Add our REST API
2274 restApi.addRestletRoutable(new CoreWebRoutable());
2275 }
2276
2277 @Override
2278 public void addInfoProvider(String type, IInfoProvider provider) {
2279 if (!providerMap.containsKey(type)) {
2280 providerMap.put(type, new ArrayList<IInfoProvider>());
2281 }
2282 providerMap.get(type).add(provider);
2283 }
2284
2285 @Override
2286 public void removeInfoProvider(String type, IInfoProvider provider) {
2287 if (!providerMap.containsKey(type)) {
2288 log.debug("Provider type {} doesn't exist.", type);
2289 return;
2290 }
2291
2292 providerMap.get(type).remove(provider);
2293 }
2294
2295 public Map<String, Object> getControllerInfo(String type) {
2296 if (!providerMap.containsKey(type)) return null;
2297
2298 Map<String, Object> result = new LinkedHashMap<String, Object>();
2299 for (IInfoProvider provider : providerMap.get(type)) {
2300 result.putAll(provider.getInfo(type));
2301 }
2302
2303 return result;
2304 }
2305
2306 @Override
2307 public void addHAListener(IHAListener listener) {
2308 this.haListeners.add(listener);
2309 }
2310
2311 @Override
2312 public void removeHAListener(IHAListener listener) {
2313 this.haListeners.remove(listener);
2314 }
2315
2316
2317 /**
2318 * Handle changes to the controller nodes IPs and dispatch update.
2319 */
2320 @SuppressWarnings("unchecked")
2321 protected void handleControllerNodeIPChanges() {
2322 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2323 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2324 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2325 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2326 CONTROLLER_INTERFACE_TYPE,
2327 CONTROLLER_INTERFACE_NUMBER,
2328 CONTROLLER_INTERFACE_DISCOVERED_IP };
2329 synchronized(controllerNodeIPsCache) {
2330 // We currently assume that interface Ethernet0 is the relevant
2331 // controller interface. Might change.
2332 // We could (should?) implement this using
2333 // predicates, but creating the individual and compound predicate
2334 // seems more overhead then just checking every row. Particularly,
2335 // since the number of rows is small and changes infrequent
2336 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2337 colNames,null, null);
2338 while (res.next()) {
2339 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2340 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2341 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2342 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2343 String curIP = controllerNodeIPsCache.get(controllerID);
2344
2345 curControllerNodeIPs.put(controllerID, discoveredIP);
2346 if (curIP == null) {
2347 // new controller node IP
2348 addedControllerNodeIPs.put(controllerID, discoveredIP);
2349 }
HIGUCHI Yuta63b30722013-10-18 18:33:46 -07002350 else if (!curIP.equals(discoveredIP)) {
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002351 // IP changed
2352 removedControllerNodeIPs.put(controllerID, curIP);
2353 addedControllerNodeIPs.put(controllerID, discoveredIP);
2354 }
2355 }
2356 }
2357 // Now figure out if rows have been deleted. We can't use the
2358 // rowKeys from rowsDeleted directly, since the tables primary
2359 // key is a compound that we can't disassemble
2360 Set<String> curEntries = curControllerNodeIPs.keySet();
2361 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2362 removedEntries.removeAll(curEntries);
2363 for (String removedControllerID : removedEntries)
2364 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2365 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2366 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2367 curControllerNodeIPs, addedControllerNodeIPs,
2368 removedControllerNodeIPs);
2369 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2370 try {
2371 this.updates.put(update);
2372 } catch (InterruptedException e) {
2373 log.error("Failure adding update to queue", e);
2374 }
2375 }
2376 }
2377 }
2378
2379 @Override
2380 public Map<String, String> getControllerNodeIPs() {
2381 // We return a copy of the mapping so we can guarantee that
2382 // the mapping return is the same as one that will be (or was)
2383 // dispatched to IHAListeners
2384 HashMap<String,String> retval = new HashMap<String,String>();
2385 synchronized(controllerNodeIPsCache) {
2386 retval.putAll(controllerNodeIPsCache);
2387 }
2388 return retval;
2389 }
2390
2391 @Override
2392 public void rowsModified(String tableName, Set<Object> rowKeys) {
2393 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2394 handleControllerNodeIPChanges();
2395 }
2396
2397 }
2398
2399 @Override
2400 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2401 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2402 handleControllerNodeIPChanges();
2403 }
2404 }
2405
2406 @Override
2407 public long getSystemStartTime() {
2408 return (this.systemStartTime);
2409 }
2410
2411 @Override
2412 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2413 this.alwaysClearFlowsOnSwAdd = value;
2414 }
2415
2416 public boolean getAlwaysClearFlowsOnSwAdd() {
2417 return this.alwaysClearFlowsOnSwAdd;
2418 }
2419}