blob: 24eeab5870943502c97d6f4d9f532637fc66aa4d [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
58import net.floodlightcontroller.core.INetMapTopologyService.ITopoRouteService;
59import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080060import net.floodlightcontroller.core.IOFSwitch;
61import net.floodlightcontroller.core.IOFSwitchFilter;
62import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berde8557a462013-01-07 08:59:31 -080063import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080064import net.floodlightcontroller.core.annotations.LogMessageDoc;
65import net.floodlightcontroller.core.annotations.LogMessageDocs;
66import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
67import net.floodlightcontroller.core.util.ListenerDispatcher;
68import net.floodlightcontroller.core.web.CoreWebRoutable;
69import net.floodlightcontroller.counter.ICounterStoreService;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -080070import net.floodlightcontroller.flowcache.IFlowService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080071import net.floodlightcontroller.packet.Ethernet;
72import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
73import net.floodlightcontroller.restserver.IRestApiService;
74import net.floodlightcontroller.storage.IResultSet;
75import net.floodlightcontroller.storage.IStorageSourceListener;
76import net.floodlightcontroller.storage.IStorageSourceService;
77import net.floodlightcontroller.storage.OperatorPredicate;
78import net.floodlightcontroller.storage.StorageException;
79import net.floodlightcontroller.threadpool.IThreadPoolService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080080import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080081import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080082import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080083
84import org.jboss.netty.bootstrap.ServerBootstrap;
85import org.jboss.netty.buffer.ChannelBuffer;
86import org.jboss.netty.buffer.ChannelBuffers;
87import org.jboss.netty.channel.Channel;
88import org.jboss.netty.channel.ChannelHandlerContext;
89import org.jboss.netty.channel.ChannelPipelineFactory;
90import org.jboss.netty.channel.ChannelStateEvent;
91import org.jboss.netty.channel.ChannelUpstreamHandler;
92import org.jboss.netty.channel.Channels;
93import org.jboss.netty.channel.ExceptionEvent;
94import org.jboss.netty.channel.MessageEvent;
95import org.jboss.netty.channel.group.ChannelGroup;
96import org.jboss.netty.channel.group.DefaultChannelGroup;
97import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
98import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
99import org.jboss.netty.handler.timeout.IdleStateEvent;
100import org.jboss.netty.handler.timeout.ReadTimeoutException;
101import org.openflow.protocol.OFEchoReply;
102import org.openflow.protocol.OFError;
103import org.openflow.protocol.OFError.OFBadActionCode;
104import org.openflow.protocol.OFError.OFBadRequestCode;
105import org.openflow.protocol.OFError.OFErrorType;
106import org.openflow.protocol.OFError.OFFlowModFailedCode;
107import org.openflow.protocol.OFError.OFHelloFailedCode;
108import org.openflow.protocol.OFError.OFPortModFailedCode;
109import org.openflow.protocol.OFError.OFQueueOpFailedCode;
110import org.openflow.protocol.OFFeaturesReply;
111import org.openflow.protocol.OFGetConfigReply;
112import org.openflow.protocol.OFMessage;
113import org.openflow.protocol.OFPacketIn;
114import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800115import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800116import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800117import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800118import org.openflow.protocol.OFPortStatus.OFPortReason;
119import org.openflow.protocol.OFSetConfig;
120import org.openflow.protocol.OFStatisticsRequest;
121import org.openflow.protocol.OFSwitchConfig;
122import org.openflow.protocol.OFType;
123import org.openflow.protocol.OFVendor;
124import org.openflow.protocol.factory.BasicFactory;
125import org.openflow.protocol.factory.MessageParseException;
126import org.openflow.protocol.statistics.OFDescriptionStatistics;
127import org.openflow.protocol.statistics.OFStatistics;
128import org.openflow.protocol.statistics.OFStatisticsType;
129import org.openflow.protocol.vendor.OFBasicVendorDataType;
130import org.openflow.protocol.vendor.OFBasicVendorId;
131import org.openflow.protocol.vendor.OFVendorId;
132import org.openflow.util.HexString;
133import org.openflow.util.U16;
134import org.openflow.util.U32;
135import org.openflow.vendor.nicira.OFNiciraVendorData;
136import org.openflow.vendor.nicira.OFRoleReplyVendorData;
137import org.openflow.vendor.nicira.OFRoleRequestVendorData;
138import org.openflow.vendor.nicira.OFRoleVendorData;
139import org.slf4j.Logger;
140import org.slf4j.LoggerFactory;
141
142
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800143
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800144/**
145 * The main controller class. Handles all setup and network listeners
146 */
147public class Controller implements IFloodlightProviderService,
148 IStorageSourceListener {
Pankaj Berde29ab7fc2013-01-25 06:17:52 -0800149
150 ThreadLocal<SwitchStorageImpl> store = new ThreadLocal<SwitchStorageImpl>() {
151 @Override
152 protected SwitchStorageImpl initialValue() {
153 SwitchStorageImpl swStore = new SwitchStorageImpl();
154 //TODO: Get the file path from global properties
155 swStore.init("/tmp/cassandra.titan");
156 return swStore;
157 }
158 };
159
160 protected SwitchStorageImpl swStore = store.get();
Pankaj Berde8557a462013-01-07 08:59:31 -0800161
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800162 protected static Logger log = LoggerFactory.getLogger(Controller.class);
163
164 private static final String ERROR_DATABASE =
165 "The controller could not communicate with the system database.";
166
167 protected BasicFactory factory;
168 protected ConcurrentMap<OFType,
169 ListenerDispatcher<OFType,IOFMessageListener>>
170 messageListeners;
171 // The activeSwitches map contains only those switches that are actively
172 // being controlled by us -- it doesn't contain switches that are
173 // in the slave role
174 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
175 // connectedSwitches contains all connected switches, including ones where
176 // we're a slave controller. We need to keep track of them so that we can
177 // send role request messages to switches when our role changes to master
178 // We add a switch to this set after it successfully completes the
179 // handshake. Access to this Set needs to be synchronized with roleChanger
180 protected HashSet<OFSwitchImpl> connectedSwitches;
181
182 // The controllerNodeIPsCache maps Controller IDs to their IP address.
183 // It's only used by handleControllerNodeIPsChanged
184 protected HashMap<String, String> controllerNodeIPsCache;
185
186 protected Set<IOFSwitchListener> switchListeners;
187 protected Set<IHAListener> haListeners;
188 protected Map<String, List<IInfoProvider>> providerMap;
189 protected BlockingQueue<IUpdate> updates;
190
191 // Module dependencies
192 protected IRestApiService restApi;
193 protected ICounterStoreService counterStore = null;
194 protected IStorageSourceService storageSource;
195 protected IPktInProcessingTimeService pktinProcTime;
196 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800197 protected IFlowService flowService;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800198 protected ITopoRouteService topoRouteService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800199 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800200
201 // Configuration options
202 protected int openFlowPort = 6633;
203 protected int workerThreads = 0;
204 // The id for this controller node. Should be unique for each controller
205 // node in a controller cluster.
206 protected String controllerId = "localhost";
207
208 // The current role of the controller.
209 // If the controller isn't configured to support roles, then this is null.
210 protected Role role;
211 // A helper that handles sending and timeout handling for role requests
212 protected RoleChanger roleChanger;
213
214 // Start time of the controller
215 protected long systemStartTime;
216
217 // Flag to always flush flow table on switch reconnect (HA or otherwise)
218 protected boolean alwaysClearFlowsOnSwAdd = false;
219
220 // Storage table names
221 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
222 protected static final String CONTROLLER_ID = "id";
223
224 protected static final String SWITCH_TABLE_NAME = "controller_switch";
225 protected static final String SWITCH_DATAPATH_ID = "dpid";
226 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
227 protected static final String SWITCH_IP = "ip";
228 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
229 protected static final String SWITCH_ACTIVE = "active";
230 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
231 protected static final String SWITCH_CAPABILITIES = "capabilities";
232 protected static final String SWITCH_BUFFERS = "buffers";
233 protected static final String SWITCH_TABLES = "tables";
234 protected static final String SWITCH_ACTIONS = "actions";
235
236 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
237 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
238
239 protected static final String PORT_TABLE_NAME = "controller_port";
240 protected static final String PORT_ID = "id";
241 protected static final String PORT_SWITCH = "switch_id";
242 protected static final String PORT_NUMBER = "number";
243 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
244 protected static final String PORT_NAME = "name";
245 protected static final String PORT_CONFIG = "config";
246 protected static final String PORT_STATE = "state";
247 protected static final String PORT_CURRENT_FEATURES = "current_features";
248 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
249 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
250 protected static final String PORT_PEER_FEATURES = "peer_features";
251
252 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
253 protected static final String CONTROLLER_INTERFACE_ID = "id";
254 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
255 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
256 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
257 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
258
259
260
261 // Perf. related configuration
262 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
263 protected static final int BATCH_MAX_SIZE = 100;
264 protected static final boolean ALWAYS_DECODE_ETH = true;
265
266 /**
267 * Updates handled by the main loop
268 */
269 protected interface IUpdate {
270 /**
271 * Calls the appropriate listeners
272 */
273 public void dispatch();
274 }
275 public enum SwitchUpdateType {
276 ADDED,
277 REMOVED,
278 PORTCHANGED
279 }
280 /**
281 * Update message indicating a switch was added or removed
282 */
283 protected class SwitchUpdate implements IUpdate {
284 public IOFSwitch sw;
285 public SwitchUpdateType switchUpdateType;
286 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
287 this.sw = sw;
288 this.switchUpdateType = switchUpdateType;
289 }
290 public void dispatch() {
291 if (log.isTraceEnabled()) {
292 log.trace("Dispatching switch update {} {}",
293 sw, switchUpdateType);
294 }
295 if (switchListeners != null) {
296 for (IOFSwitchListener listener : switchListeners) {
297 switch(switchUpdateType) {
298 case ADDED:
299 listener.addedSwitch(sw);
300 break;
301 case REMOVED:
302 listener.removedSwitch(sw);
303 break;
304 case PORTCHANGED:
305 listener.switchPortChanged(sw.getId());
306 break;
307 }
308 }
309 }
310 }
311 }
312
313 /**
314 * Update message indicating controller's role has changed
315 */
316 protected class HARoleUpdate implements IUpdate {
317 public Role oldRole;
318 public Role newRole;
319 public HARoleUpdate(Role newRole, Role oldRole) {
320 this.oldRole = oldRole;
321 this.newRole = newRole;
322 }
323 public void dispatch() {
324 // Make sure that old and new roles are different.
325 if (oldRole == newRole) {
326 if (log.isTraceEnabled()) {
327 log.trace("HA role update ignored as the old and " +
328 "new roles are the same. newRole = {}" +
329 "oldRole = {}", newRole, oldRole);
330 }
331 return;
332 }
333 if (log.isTraceEnabled()) {
334 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
335 newRole, oldRole);
336 }
337 if (haListeners != null) {
338 for (IHAListener listener : haListeners) {
339 listener.roleChanged(oldRole, newRole);
340 }
341 }
342 }
343 }
344
345 /**
346 * Update message indicating
347 * IPs of controllers in controller cluster have changed.
348 */
349 protected class HAControllerNodeIPUpdate implements IUpdate {
350 public Map<String,String> curControllerNodeIPs;
351 public Map<String,String> addedControllerNodeIPs;
352 public Map<String,String> removedControllerNodeIPs;
353 public HAControllerNodeIPUpdate(
354 HashMap<String,String> curControllerNodeIPs,
355 HashMap<String,String> addedControllerNodeIPs,
356 HashMap<String,String> removedControllerNodeIPs) {
357 this.curControllerNodeIPs = curControllerNodeIPs;
358 this.addedControllerNodeIPs = addedControllerNodeIPs;
359 this.removedControllerNodeIPs = removedControllerNodeIPs;
360 }
361 public void dispatch() {
362 if (log.isTraceEnabled()) {
363 log.trace("Dispatching HA Controller Node IP update "
364 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
365 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
366 removedControllerNodeIPs }
367 );
368 }
369 if (haListeners != null) {
370 for (IHAListener listener: haListeners) {
371 listener.controllerNodeIPsChanged(curControllerNodeIPs,
372 addedControllerNodeIPs, removedControllerNodeIPs);
373 }
374 }
375 }
376 }
377
378 // ***************
379 // Getters/Setters
380 // ***************
381
382 public void setStorageSourceService(IStorageSourceService storageSource) {
383 this.storageSource = storageSource;
384 }
385
386 public void setCounterStore(ICounterStoreService counterStore) {
387 this.counterStore = counterStore;
388 }
389
390 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
391 this.pktinProcTime = pits;
392 }
393
394 public void setRestApiService(IRestApiService restApi) {
395 this.restApi = restApi;
396 }
397
398 public void setThreadPoolService(IThreadPoolService tp) {
399 this.threadPool = tp;
400 }
401
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800402 public void setFlowService(IFlowService serviceImpl) {
403 this.flowService = serviceImpl;
404 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800405
406 public void setTopoRouteService(ITopoRouteService serviceImpl) {
407 this.topoRouteService = serviceImpl;
408 }
Jonathan Hartc2e95ee2013-02-22 15:25:11 -0800409
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800410 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800411 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800412 }
413
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800414 @Override
415 public Role getRole() {
416 synchronized(roleChanger) {
417 return role;
418 }
419 }
420
421 @Override
422 public void setRole(Role role) {
423 if (role == null) throw new NullPointerException("Role can not be null.");
424 if (role == Role.MASTER && this.role == Role.SLAVE) {
425 // Reset db state to Inactive for all switches.
426 updateAllInactiveSwitchInfo();
427 }
428
429 // Need to synchronize to ensure a reliable ordering on role request
430 // messages send and to ensure the list of connected switches is stable
431 // RoleChanger will handle the actual sending of the message and
432 // timeout handling
433 // @see RoleChanger
434 synchronized(roleChanger) {
435 if (role.equals(this.role)) {
436 log.debug("Ignoring role change: role is already {}", role);
437 return;
438 }
439
440 Role oldRole = this.role;
441 this.role = role;
442
443 log.debug("Submitting role change request to role {}", role);
444 roleChanger.submitRequest(connectedSwitches, role);
445
446 // Enqueue an update for our listeners.
447 try {
448 this.updates.put(new HARoleUpdate(role, oldRole));
449 } catch (InterruptedException e) {
450 log.error("Failure adding update to queue", e);
451 }
452 }
453 }
454
455
456
457 // **********************
458 // ChannelUpstreamHandler
459 // **********************
460
461 /**
462 * Return a new channel handler for processing a switch connections
463 * @param state The channel state object for the connection
464 * @return the new channel handler
465 */
466 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
467 return new OFChannelHandler(state);
468 }
469
Jonathan Hartcc957a02013-02-26 10:39:04 -0800470 protected class RoleChangeCallback implements ControlChangeCallback {
471 @Override
472 public void controlChanged(long dpid, boolean hasControl) {
473 log.info("Role change callback for switch {}, hasControl {}",
474 HexString.toHexString(dpid), hasControl);
475
476 synchronized(roleChanger){
477 OFSwitchImpl sw = null;
478 for (OFSwitchImpl connectedSw : connectedSwitches){
479 if (connectedSw.getId() == dpid){
480 sw = connectedSw;
481 break;
482 }
483 }
484 if (sw == null){
485 log.warn("Switch {} not found in connected switches",
486 HexString.toHexString(dpid));
487 return;
488 }
489
490 Role role = null;
491
Pankaj Berde01939e92013-03-08 14:38:27 -0800492 /*
493 * issue #229
494 * Cannot rely on sw.getRole() as it can be behind due to pending
495 * role changes in the queue. Just submit it and late the RoleChanger
496 * handle duplicates.
497 */
498
499 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800500 role = Role.MASTER;
501 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800502 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800503 role = Role.SLAVE;
504 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800505
506 log.debug("Sending role request {} msg to {}", role, sw);
507 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
508 swList.add(sw);
509 roleChanger.submitRequest(swList, role);
510
Jonathan Hartcc957a02013-02-26 10:39:04 -0800511 }
512
513 }
514 }
515
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800516 /**
517 * Channel handler deals with the switch connection and dispatches
518 * switch messages to the appropriate locations.
519 * @author readams
520 */
521 protected class OFChannelHandler
522 extends IdleStateAwareChannelUpstreamHandler {
523 protected OFSwitchImpl sw;
524 protected OFChannelState state;
525
526 public OFChannelHandler(OFChannelState state) {
527 this.state = state;
528 }
529
530 @Override
531 @LogMessageDoc(message="New switch connection from {ip address}",
532 explanation="A new switch has connected from the " +
533 "specified IP address")
534 public void channelConnected(ChannelHandlerContext ctx,
535 ChannelStateEvent e) throws Exception {
536 log.info("New switch connection from {}",
537 e.getChannel().getRemoteAddress());
538
539 sw = new OFSwitchImpl();
540 sw.setChannel(e.getChannel());
541 sw.setFloodlightProvider(Controller.this);
542 sw.setThreadPoolService(threadPool);
543
544 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
545 msglist.add(factory.getMessage(OFType.HELLO));
546 e.getChannel().write(msglist);
547
548 }
549
550 @Override
551 @LogMessageDoc(message="Disconnected switch {switch information}",
552 explanation="The specified switch has disconnected.")
553 public void channelDisconnected(ChannelHandlerContext ctx,
554 ChannelStateEvent e) throws Exception {
555 if (sw != null && state.hsState == HandshakeState.READY) {
556 if (activeSwitches.containsKey(sw.getId())) {
557 // It's safe to call removeSwitch even though the map might
558 // not contain this particular switch but another with the
559 // same DPID
560 removeSwitch(sw);
561 }
562 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700563 if (controlRequested) {
564 registryService.releaseControl(sw.getId());
565 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800566 connectedSwitches.remove(sw);
567 }
568 sw.setConnected(false);
569 }
570 log.info("Disconnected switch {}", sw);
571 }
572
573 @Override
574 @LogMessageDocs({
575 @LogMessageDoc(level="ERROR",
576 message="Disconnecting switch {switch} due to read timeout",
577 explanation="The connected switch has failed to send any " +
578 "messages or respond to echo requests",
579 recommendation=LogMessageDoc.CHECK_SWITCH),
580 @LogMessageDoc(level="ERROR",
581 message="Disconnecting switch {switch}: failed to " +
582 "complete handshake",
583 explanation="The switch did not respond correctly " +
584 "to handshake messages",
585 recommendation=LogMessageDoc.CHECK_SWITCH),
586 @LogMessageDoc(level="ERROR",
587 message="Disconnecting switch {switch} due to IO Error: {}",
588 explanation="There was an error communicating with the switch",
589 recommendation=LogMessageDoc.CHECK_SWITCH),
590 @LogMessageDoc(level="ERROR",
591 message="Disconnecting switch {switch} due to switch " +
592 "state error: {error}",
593 explanation="The switch sent an unexpected message",
594 recommendation=LogMessageDoc.CHECK_SWITCH),
595 @LogMessageDoc(level="ERROR",
596 message="Disconnecting switch {switch} due to " +
597 "message parse failure",
598 explanation="Could not parse a message from the switch",
599 recommendation=LogMessageDoc.CHECK_SWITCH),
600 @LogMessageDoc(level="ERROR",
601 message="Terminating controller due to storage exception",
602 explanation=ERROR_DATABASE,
603 recommendation=LogMessageDoc.CHECK_CONTROLLER),
604 @LogMessageDoc(level="ERROR",
605 message="Could not process message: queue full",
606 explanation="OpenFlow messages are arriving faster than " +
607 " the controller can process them.",
608 recommendation=LogMessageDoc.CHECK_CONTROLLER),
609 @LogMessageDoc(level="ERROR",
610 message="Error while processing message " +
611 "from switch {switch} {cause}",
612 explanation="An error occurred processing the switch message",
613 recommendation=LogMessageDoc.GENERIC_ACTION)
614 })
615 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
616 throws Exception {
617 if (e.getCause() instanceof ReadTimeoutException) {
618 // switch timeout
619 log.error("Disconnecting switch {} due to read timeout", sw);
620 ctx.getChannel().close();
621 } else if (e.getCause() instanceof HandshakeTimeoutException) {
622 log.error("Disconnecting switch {}: failed to complete handshake",
623 sw);
624 ctx.getChannel().close();
625 } else if (e.getCause() instanceof ClosedChannelException) {
626 //log.warn("Channel for sw {} already closed", sw);
627 } else if (e.getCause() instanceof IOException) {
628 log.error("Disconnecting switch {} due to IO Error: {}",
629 sw, e.getCause().getMessage());
630 ctx.getChannel().close();
631 } else if (e.getCause() instanceof SwitchStateException) {
632 log.error("Disconnecting switch {} due to switch state error: {}",
633 sw, e.getCause().getMessage());
634 ctx.getChannel().close();
635 } else if (e.getCause() instanceof MessageParseException) {
636 log.error("Disconnecting switch " + sw +
637 " due to message parse failure",
638 e.getCause());
639 ctx.getChannel().close();
640 } else if (e.getCause() instanceof StorageException) {
641 log.error("Terminating controller due to storage exception",
642 e.getCause());
643 terminate();
644 } else if (e.getCause() instanceof RejectedExecutionException) {
645 log.warn("Could not process message: queue full");
646 } else {
647 log.error("Error while processing message from switch " + sw,
648 e.getCause());
649 ctx.getChannel().close();
650 }
651 }
652
653 @Override
654 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
655 throws Exception {
656 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
657 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
658 e.getChannel().write(msglist);
659 }
660
661 @Override
662 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
663 throws Exception {
664 if (e.getMessage() instanceof List) {
665 @SuppressWarnings("unchecked")
666 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
667
668 for (OFMessage ofm : msglist) {
669 try {
670 processOFMessage(ofm);
671 }
672 catch (Exception ex) {
673 // We are the last handler in the stream, so run the
674 // exception through the channel again by passing in
675 // ctx.getChannel().
676 Channels.fireExceptionCaught(ctx.getChannel(), ex);
677 }
678 }
679
680 // Flush all flow-mods/packet-out generated from this "train"
681 OFSwitchImpl.flush_all();
682 }
683 }
684
685 /**
686 * Process the request for the switch description
687 */
688 @LogMessageDoc(level="ERROR",
689 message="Exception in reading description " +
690 " during handshake {exception}",
691 explanation="Could not process the switch description string",
692 recommendation=LogMessageDoc.CHECK_SWITCH)
693 void processSwitchDescReply() {
694 try {
695 // Read description, if it has been updated
696 @SuppressWarnings("unchecked")
697 Future<List<OFStatistics>> desc_future =
698 (Future<List<OFStatistics>>)sw.
699 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
700 List<OFStatistics> values =
701 desc_future.get(0, TimeUnit.MILLISECONDS);
702 if (values != null) {
703 OFDescriptionStatistics description =
704 new OFDescriptionStatistics();
705 ChannelBuffer data =
706 ChannelBuffers.buffer(description.getLength());
707 for (OFStatistics f : values) {
708 f.writeTo(data);
709 description.readFrom(data);
710 break; // SHOULD be a list of length 1
711 }
712 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
713 description);
714 sw.setSwitchProperties(description);
715 data = null;
716
717 // At this time, also set other switch properties from storage
718 boolean is_core_switch = false;
719 IResultSet resultSet = null;
720 try {
721 String swid = sw.getStringId();
722 resultSet =
723 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
724 for (Iterator<IResultSet> it =
725 resultSet.iterator(); it.hasNext();) {
726 // In case of multiple rows, use the status
727 // in last row?
728 Map<String, Object> row = it.next().getRow();
729 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
730 if (log.isDebugEnabled()) {
731 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
732 "config for switch={}, is-core={}",
733 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
734 }
735 String ics =
736 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
737 is_core_switch = ics.equals("true");
738 }
739 }
740 }
741 finally {
742 if (resultSet != null)
743 resultSet.close();
744 }
745 if (is_core_switch) {
746 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
747 new Boolean(true));
748 }
749 }
750 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
751 state.hasDescription = true;
752 checkSwitchReady();
753 }
754 catch (InterruptedException ex) {
755 // Ignore
756 }
757 catch (TimeoutException ex) {
758 // Ignore
759 } catch (Exception ex) {
760 log.error("Exception in reading description " +
761 " during handshake", ex);
762 }
763 }
764
765 /**
766 * Send initial switch setup information that we need before adding
767 * the switch
768 * @throws IOException
769 */
770 void sendHelloConfiguration() throws IOException {
771 // Send initial Features Request
772 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
773 }
774
775 /**
776 * Send the configuration requests we can only do after we have
777 * the features reply
778 * @throws IOException
779 */
780 void sendFeatureReplyConfiguration() throws IOException {
781 // Ensure we receive the full packet via PacketIn
782 OFSetConfig config = (OFSetConfig) factory
783 .getMessage(OFType.SET_CONFIG);
784 config.setMissSendLength((short) 0xffff)
785 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
786 sw.write(config, null);
787 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
788 null);
789
790 // Get Description to set switch-specific flags
791 OFStatisticsRequest req = new OFStatisticsRequest();
792 req.setStatisticType(OFStatisticsType.DESC);
793 req.setLengthU(req.getLengthU());
794 Future<List<OFStatistics>> dfuture =
795 sw.getStatistics(req);
796 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
797 dfuture);
798
799 }
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700800
801 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800802 protected void checkSwitchReady() {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700803
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800804 if (state.hsState == HandshakeState.FEATURES_REPLY &&
805 state.hasDescription && state.hasGetConfigReply) {
806
807 state.hsState = HandshakeState.READY;
808
809 synchronized(roleChanger) {
810 // We need to keep track of all of the switches that are connected
811 // to the controller, in any role, so that we can later send the
812 // role request messages when the controller role changes.
813 // We need to be synchronized while doing this: we must not
814 // send a another role request to the connectedSwitches until
815 // we were able to add this new switch to connectedSwitches
816 // *and* send the current role to the new switch.
817 connectedSwitches.add(sw);
818
819 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800820 //Put the switch in SLAVE mode until we know we have control
821 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
822 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
823 swList.add(sw);
824 roleChanger.submitRequest(swList, Role.SLAVE);
825
Jonathan Hartcc957a02013-02-26 10:39:04 -0800826 //Request control of the switch from the global registry
827 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700828 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800829 registryService.requestControl(sw.getId(),
830 new RoleChangeCallback());
831 } catch (RegistryException e) {
832 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700833 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800834 }
835
Jonathan Hart97801ac2013-02-26 14:29:16 -0800836
Jonathan Hartcc957a02013-02-26 10:39:04 -0800837
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800838 // Send a role request if role support is enabled for the controller
839 // This is a probe that we'll use to determine if the switch
840 // actually supports the role request message. If it does we'll
841 // get back a role reply message. If it doesn't we'll get back an
842 // OFError message.
843 // If role is MASTER we will promote switch to active
844 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800845 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800846 log.debug("This controller's role is {}, " +
847 "sending initial role request msg to {}",
848 role, sw);
849 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
850 swList.add(sw);
851 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800852 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800853 }
854 else {
855 // Role supported not enabled on controller (for now)
856 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800857 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800858 "not sending role request msg to {}",
859 role, sw);
860 // Need to clear FlowMods before we add the switch
861 // and dispatch updates otherwise we have a race condition.
862 sw.clearAllFlowMods();
863 addSwitch(sw);
864 state.firstRoleReplyReceived = true;
865 }
866 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700867 if (!controlRequested) {
868 // yield to allow other thread(s) to release control
869 try {
870 Thread.sleep(10);
871 } catch (InterruptedException e) {
872 // Ignore interruptions
873 }
874 // safer to bounce the switch to reconnect here than proceeding further
875 sw.channel.close();
876 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800877 }
878 }
879
880 /* Handle a role reply message we received from the switch. Since
881 * netty serializes message dispatch we don't need to synchronize
882 * against other receive operations from the same switch, so no need
883 * to synchronize addSwitch(), removeSwitch() operations from the same
884 * connection.
885 * FIXME: However, when a switch with the same DPID connects we do
886 * need some synchronization. However, handling switches with same
887 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
888 * removedSwitch notification):1
889 *
890 */
891 @LogMessageDoc(level="ERROR",
892 message="Invalid role value in role reply message",
893 explanation="Was unable to set the HA role (master or slave) " +
894 "for the controller.",
895 recommendation=LogMessageDoc.CHECK_CONTROLLER)
896 protected void handleRoleReplyMessage(OFVendor vendorMessage,
897 OFRoleReplyVendorData roleReplyVendorData) {
898 // Map from the role code in the message to our role enum
899 int nxRole = roleReplyVendorData.getRole();
900 Role role = null;
901 switch (nxRole) {
902 case OFRoleVendorData.NX_ROLE_OTHER:
903 role = Role.EQUAL;
904 break;
905 case OFRoleVendorData.NX_ROLE_MASTER:
906 role = Role.MASTER;
907 break;
908 case OFRoleVendorData.NX_ROLE_SLAVE:
909 role = Role.SLAVE;
910 break;
911 default:
912 log.error("Invalid role value in role reply message");
913 sw.getChannel().close();
914 return;
915 }
916
917 log.debug("Handling role reply for role {} from {}. " +
918 "Controller's role is {} ",
919 new Object[] { role, sw, Controller.this.role}
920 );
921
922 sw.deliverRoleReply(vendorMessage.getXid(), role);
923
924 boolean isActive = activeSwitches.containsKey(sw.getId());
925 if (!isActive && sw.isActive()) {
926 // Transition from SLAVE to MASTER.
927
928 if (!state.firstRoleReplyReceived ||
929 getAlwaysClearFlowsOnSwAdd()) {
930 // This is the first role-reply message we receive from
931 // this switch or roles were disabled when the switch
932 // connected:
933 // Delete all pre-existing flows for new connections to
934 // the master
935 //
936 // FIXME: Need to think more about what the test should
937 // be for when we flush the flow-table? For example,
938 // if all the controllers are temporarily in the backup
939 // role (e.g. right after a failure of the master
940 // controller) at the point the switch connects, then
941 // all of the controllers will initially connect as
942 // backup controllers and not flush the flow-table.
943 // Then when one of them is promoted to master following
944 // the master controller election the flow-table
945 // will still not be flushed because that's treated as
946 // a failover event where we don't want to flush the
947 // flow-table. The end result would be that the flow
948 // table for a newly connected switch is never
949 // flushed. Not sure how to handle that case though...
950 sw.clearAllFlowMods();
951 log.debug("First role reply from master switch {}, " +
952 "clear FlowTable to active switch list",
953 HexString.toHexString(sw.getId()));
954 }
955
956 // Some switches don't seem to update us with port
957 // status messages while in slave role.
958 readSwitchPortStateFromStorage(sw);
959
960 // Only add the switch to the active switch list if
961 // we're not in the slave role. Note that if the role
962 // attribute is null, then that means that the switch
963 // doesn't support the role request messages, so in that
964 // case we're effectively in the EQUAL role and the
965 // switch should be included in the active switch list.
966 addSwitch(sw);
967 log.debug("Added master switch {} to active switch list",
968 HexString.toHexString(sw.getId()));
969
970 }
971 else if (isActive && !sw.isActive()) {
972 // Transition from MASTER to SLAVE: remove switch
973 // from active switch list.
974 log.debug("Removed slave switch {} from active switch" +
975 " list", HexString.toHexString(sw.getId()));
976 removeSwitch(sw);
977 }
978
979 // Indicate that we have received a role reply message.
980 state.firstRoleReplyReceived = true;
981 }
982
983 protected boolean handleVendorMessage(OFVendor vendorMessage) {
984 boolean shouldHandleMessage = false;
985 int vendor = vendorMessage.getVendor();
986 switch (vendor) {
987 case OFNiciraVendorData.NX_VENDOR_ID:
988 OFNiciraVendorData niciraVendorData =
989 (OFNiciraVendorData)vendorMessage.getVendorData();
990 int dataType = niciraVendorData.getDataType();
991 switch (dataType) {
992 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
993 OFRoleReplyVendorData roleReplyVendorData =
994 (OFRoleReplyVendorData) niciraVendorData;
995 handleRoleReplyMessage(vendorMessage,
996 roleReplyVendorData);
997 break;
998 default:
999 log.warn("Unhandled Nicira VENDOR message; " +
1000 "data type = {}", dataType);
1001 break;
1002 }
1003 break;
1004 default:
1005 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1006 break;
1007 }
1008
1009 return shouldHandleMessage;
1010 }
1011
1012 /**
1013 * Dispatch an Openflow message from a switch to the appropriate
1014 * handler.
1015 * @param m The message to process
1016 * @throws IOException
1017 * @throws SwitchStateException
1018 */
1019 @LogMessageDocs({
1020 @LogMessageDoc(level="WARN",
1021 message="Config Reply from {switch} has " +
1022 "miss length set to {length}",
1023 explanation="The controller requires that the switch " +
1024 "use a miss length of 0xffff for correct " +
1025 "function",
1026 recommendation="Use a different switch to ensure " +
1027 "correct function"),
1028 @LogMessageDoc(level="WARN",
1029 message="Received ERROR from sw {switch} that "
1030 +"indicates roles are not supported "
1031 +"but we have received a valid "
1032 +"role reply earlier",
1033 explanation="The switch sent a confusing message to the" +
1034 "controller")
1035 })
1036 protected void processOFMessage(OFMessage m)
1037 throws IOException, SwitchStateException {
1038 boolean shouldHandleMessage = false;
1039
1040 switch (m.getType()) {
1041 case HELLO:
1042 if (log.isTraceEnabled())
1043 log.trace("HELLO from {}", sw);
1044
1045 if (state.hsState.equals(HandshakeState.START)) {
1046 state.hsState = HandshakeState.HELLO;
1047 sendHelloConfiguration();
1048 } else {
1049 throw new SwitchStateException("Unexpected HELLO from "
1050 + sw);
1051 }
1052 break;
1053 case ECHO_REQUEST:
1054 OFEchoReply reply =
1055 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1056 reply.setXid(m.getXid());
1057 sw.write(reply, null);
1058 break;
1059 case ECHO_REPLY:
1060 break;
1061 case FEATURES_REPLY:
1062 if (log.isTraceEnabled())
1063 log.trace("Features Reply from {}", sw);
1064
1065 sw.setFeaturesReply((OFFeaturesReply) m);
1066 if (state.hsState.equals(HandshakeState.HELLO)) {
1067 sendFeatureReplyConfiguration();
1068 state.hsState = HandshakeState.FEATURES_REPLY;
1069 // uncomment to enable "dumb" switches like cbench
1070 // state.hsState = HandshakeState.READY;
1071 // addSwitch(sw);
1072 } else {
1073 // return results to rest api caller
1074 sw.deliverOFFeaturesReply(m);
1075 // update database */
1076 updateActiveSwitchInfo(sw);
1077 }
1078 break;
1079 case GET_CONFIG_REPLY:
1080 if (log.isTraceEnabled())
1081 log.trace("Get config reply from {}", sw);
1082
1083 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1084 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1085 throw new SwitchStateException(em);
1086 }
1087 OFGetConfigReply cr = (OFGetConfigReply) m;
1088 if (cr.getMissSendLength() == (short)0xffff) {
1089 log.trace("Config Reply from {} confirms " +
1090 "miss length set to 0xffff", sw);
1091 } else {
1092 log.warn("Config Reply from {} has " +
1093 "miss length set to {}",
1094 sw, cr.getMissSendLength() & 0xffff);
1095 }
1096 state.hasGetConfigReply = true;
1097 checkSwitchReady();
1098 break;
1099 case VENDOR:
1100 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1101 break;
1102 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001103 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001104 // TODO: we need better error handling. Especially for
1105 // request/reply style message (stats, roles) we should have
1106 // a unified way to lookup the xid in the error message.
1107 // This will probable involve rewriting the way we handle
1108 // request/reply style messages.
1109 OFError error = (OFError) m;
1110 boolean shouldLogError = true;
1111 // TODO: should we check that firstRoleReplyReceived is false,
1112 // i.e., check only whether the first request fails?
1113 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1114 boolean isBadVendorError =
1115 (error.getErrorType() == OFError.OFErrorType.
1116 OFPET_BAD_REQUEST.getValue());
1117 // We expect to receive a bad vendor error when
1118 // we're connected to a switch that doesn't support
1119 // the Nicira vendor extensions (i.e. not OVS or
1120 // derived from OVS). By protocol, it should also be
1121 // BAD_VENDOR, but too many switch implementations
1122 // get it wrong and we can already check the xid()
1123 // so we can ignore the type with confidence that this
1124 // is not a spurious error
1125 shouldLogError = !isBadVendorError;
1126 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001127 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001128 if (state.firstRoleReplyReceived && (role != null)) {
1129 log.warn("Received ERROR from sw {} that "
1130 +"indicates roles are not supported "
1131 +"but we have received a valid "
1132 +"role reply earlier", sw);
1133 }
1134 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001135 Role requestedRole =
1136 sw.deliverRoleRequestNotSupported(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001137 synchronized(roleChanger) {
1138 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001139 //This will now never happen. The Controller's role
1140 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001141 // the switch doesn't understand role request
1142 // messages and the current controller role is
1143 // slave. We need to disconnect the switch.
1144 // @see RoleChanger for rationale
1145 sw.getChannel().close();
1146 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001147 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001148 log.debug("Adding switch {} because we got an error" +
1149 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001150 // Controller's role is master: add to
1151 // active
1152 // TODO: check if clearing flow table is
1153 // right choice here.
1154 // Need to clear FlowMods before we add the switch
1155 // and dispatch updates otherwise we have a race condition.
1156 // TODO: switch update is async. Won't we still have a potential
1157 // race condition?
1158 sw.clearAllFlowMods();
1159 addSwitch(sw);
1160 }
1161 }
1162 }
1163 else {
1164 // TODO: Is this the right thing to do if we receive
1165 // some other error besides a bad vendor error?
1166 // Presumably that means the switch did actually
1167 // understand the role request message, but there
1168 // was some other error from processing the message.
1169 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1170 // error code, but it doesn't look like the Nicira
1171 // role request has that. Should check OVS source
1172 // code to see if it's possible for any other errors
1173 // to be returned.
1174 // If we received an error the switch is not
1175 // in the correct role, so we need to disconnect it.
1176 // We could also resend the request but then we need to
1177 // check if there are other pending request in which
1178 // case we shouldn't resend. If we do resend we need
1179 // to make sure that the switch eventually accepts one
1180 // of our requests or disconnect the switch. This feels
1181 // cumbersome.
1182 sw.getChannel().close();
1183 }
1184 }
1185 // Once we support OF 1.2, we'd add code to handle it here.
1186 //if (error.getXid() == state.ofRoleRequestXid) {
1187 //}
1188 if (shouldLogError)
1189 logError(sw, error);
1190 break;
1191 case STATS_REPLY:
1192 if (state.hsState.ordinal() <
1193 HandshakeState.FEATURES_REPLY.ordinal()) {
1194 String em = "Unexpected STATS_REPLY from " + sw;
1195 throw new SwitchStateException(em);
1196 }
1197 sw.deliverStatisticsReply(m);
1198 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1199 processSwitchDescReply();
1200 }
1201 break;
1202 case PORT_STATUS:
1203 // We want to update our port state info even if we're in
1204 // the slave role, but we only want to update storage if
1205 // we're the master (or equal).
1206 boolean updateStorage = state.hsState.
1207 equals(HandshakeState.READY) &&
1208 (sw.getRole() != Role.SLAVE);
1209 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1210 shouldHandleMessage = true;
1211 break;
1212
1213 default:
1214 shouldHandleMessage = true;
1215 break;
1216 }
1217
1218 if (shouldHandleMessage) {
1219 sw.getListenerReadLock().lock();
1220 try {
1221 if (sw.isConnected()) {
1222 if (!state.hsState.equals(HandshakeState.READY)) {
1223 log.debug("Ignoring message type {} received " +
1224 "from switch {} before switch is " +
1225 "fully configured.", m.getType(), sw);
1226 }
1227 // Check if the controller is in the slave role for the
1228 // switch. If it is, then don't dispatch the message to
1229 // the listeners.
1230 // TODO: Should we dispatch messages that we expect to
1231 // receive when we're in the slave role, e.g. port
1232 // status messages? Since we're "hiding" switches from
1233 // the listeners when we're in the slave role, then it
1234 // seems a little weird to dispatch port status messages
1235 // to them. On the other hand there might be special
1236 // modules that care about all of the connected switches
1237 // and would like to receive port status notifications.
1238 else if (sw.getRole() == Role.SLAVE) {
1239 // Don't log message if it's a port status message
1240 // since we expect to receive those from the switch
1241 // and don't want to emit spurious messages.
1242 if (m.getType() != OFType.PORT_STATUS) {
1243 log.debug("Ignoring message type {} received " +
1244 "from switch {} while in the slave role.",
1245 m.getType(), sw);
1246 }
1247 } else {
1248 handleMessage(sw, m, null);
1249 }
1250 }
1251 }
1252 finally {
1253 sw.getListenerReadLock().unlock();
1254 }
1255 }
1256 }
1257 }
1258
1259 // ****************
1260 // Message handlers
1261 // ****************
1262
1263 protected void handlePortStatusMessage(IOFSwitch sw,
1264 OFPortStatus m,
1265 boolean updateStorage) {
1266 short portNumber = m.getDesc().getPortNumber();
1267 OFPhysicalPort port = m.getDesc();
1268 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001269 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1270 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001271 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001272 if (!portDown) {
Pankaj Berde6debb042013-01-16 18:04:32 -08001273 swStore.addPort(sw.getStringId(), port);
1274 } else {
1275 swStore.deletePort(sw.getStringId(), port.getPortNumber());
1276 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001277 if (updateStorage)
1278 updatePortInfo(sw, port);
1279 log.debug("Port #{} modified for {}", portNumber, sw);
1280 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1281 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001282 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001283 if (updateStorage)
1284 updatePortInfo(sw, port);
1285 log.debug("Port #{} added for {}", portNumber, sw);
1286 } else if (m.getReason() ==
1287 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1288 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001289 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001290 if (updateStorage)
1291 removePortInfo(sw, portNumber);
1292 log.debug("Port #{} deleted for {}", portNumber, sw);
1293 }
1294 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1295 try {
1296 this.updates.put(update);
1297 } catch (InterruptedException e) {
1298 log.error("Failure adding update to queue", e);
1299 }
1300 }
1301
1302 /**
1303 * flcontext_cache - Keep a thread local stack of contexts
1304 */
1305 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1306 new ThreadLocal <Stack<FloodlightContext>> () {
1307 @Override
1308 protected Stack<FloodlightContext> initialValue() {
1309 return new Stack<FloodlightContext>();
1310 }
1311 };
1312
1313 /**
1314 * flcontext_alloc - pop a context off the stack, if required create a new one
1315 * @return FloodlightContext
1316 */
1317 protected static FloodlightContext flcontext_alloc() {
1318 FloodlightContext flcontext = null;
1319
1320 if (flcontext_cache.get().empty()) {
1321 flcontext = new FloodlightContext();
1322 }
1323 else {
1324 flcontext = flcontext_cache.get().pop();
1325 }
1326
1327 return flcontext;
1328 }
1329
1330 /**
1331 * flcontext_free - Free the context to the current thread
1332 * @param flcontext
1333 */
1334 protected void flcontext_free(FloodlightContext flcontext) {
1335 flcontext.getStorage().clear();
1336 flcontext_cache.get().push(flcontext);
1337 }
1338
1339 /**
1340 * Handle replies to certain OFMessages, and pass others off to listeners
1341 * @param sw The switch for the message
1342 * @param m The message
1343 * @param bContext The floodlight context. If null then floodlight context would
1344 * be allocated in this function
1345 * @throws IOException
1346 */
1347 @LogMessageDocs({
1348 @LogMessageDoc(level="ERROR",
1349 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1350 " field is empty.",
1351 explanation="The switch sent an improperly-formatted PacketIn" +
1352 " message",
1353 recommendation=LogMessageDoc.CHECK_SWITCH),
1354 @LogMessageDoc(level="WARN",
1355 message="Unhandled OF Message: {} from {}",
1356 explanation="The switch sent a message not handled by " +
1357 "the controller")
1358 })
1359 protected void handleMessage(IOFSwitch sw, OFMessage m,
1360 FloodlightContext bContext)
1361 throws IOException {
1362 Ethernet eth = null;
1363
1364 switch (m.getType()) {
1365 case PACKET_IN:
1366 OFPacketIn pi = (OFPacketIn)m;
1367
1368 if (pi.getPacketData().length <= 0) {
1369 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1370 ") because the data field is empty.");
1371 return;
1372 }
1373
1374 if (Controller.ALWAYS_DECODE_ETH) {
1375 eth = new Ethernet();
1376 eth.deserialize(pi.getPacketData(), 0,
1377 pi.getPacketData().length);
1378 counterStore.updatePacketInCounters(sw, m, eth);
1379 }
1380 // fall through to default case...
1381
1382 default:
1383
1384 List<IOFMessageListener> listeners = null;
1385 if (messageListeners.containsKey(m.getType())) {
1386 listeners = messageListeners.get(m.getType()).
1387 getOrderedListeners();
1388 }
1389
1390 FloodlightContext bc = null;
1391 if (listeners != null) {
1392 // Check if floodlight context is passed from the calling
1393 // function, if so use that floodlight context, otherwise
1394 // allocate one
1395 if (bContext == null) {
1396 bc = flcontext_alloc();
1397 } else {
1398 bc = bContext;
1399 }
1400 if (eth != null) {
1401 IFloodlightProviderService.bcStore.put(bc,
1402 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1403 eth);
1404 }
1405
1406 // Get the starting time (overall and per-component) of
1407 // the processing chain for this packet if performance
1408 // monitoring is turned on
1409 pktinProcTime.bootstrap(listeners);
1410 pktinProcTime.recordStartTimePktIn();
1411 Command cmd;
1412 for (IOFMessageListener listener : listeners) {
1413 if (listener instanceof IOFSwitchFilter) {
1414 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1415 continue;
1416 }
1417 }
1418
1419 pktinProcTime.recordStartTimeComp(listener);
1420 cmd = listener.receive(sw, m, bc);
1421 pktinProcTime.recordEndTimeComp(listener);
1422
1423 if (Command.STOP.equals(cmd)) {
1424 break;
1425 }
1426 }
1427 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1428 } else {
1429 log.warn("Unhandled OF Message: {} from {}", m, sw);
1430 }
1431
1432 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1433 }
1434 }
1435
1436 /**
1437 * Log an OpenFlow error message from a switch
1438 * @param sw The switch that sent the error
1439 * @param error The error message
1440 */
1441 @LogMessageDoc(level="ERROR",
1442 message="Error {error type} {error code} from {switch}",
1443 explanation="The switch responded with an unexpected error" +
1444 "to an OpenFlow message from the controller",
1445 recommendation="This could indicate improper network operation. " +
1446 "If the problem persists restarting the switch and " +
1447 "controller may help."
1448 )
1449 protected void logError(IOFSwitch sw, OFError error) {
1450 int etint = 0xffff & error.getErrorType();
1451 if (etint < 0 || etint >= OFErrorType.values().length) {
1452 log.error("Unknown error code {} from sw {}", etint, sw);
1453 }
1454 OFErrorType et = OFErrorType.values()[etint];
1455 switch (et) {
1456 case OFPET_HELLO_FAILED:
1457 OFHelloFailedCode hfc =
1458 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1459 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1460 break;
1461 case OFPET_BAD_REQUEST:
1462 OFBadRequestCode brc =
1463 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1464 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1465 break;
1466 case OFPET_BAD_ACTION:
1467 OFBadActionCode bac =
1468 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1469 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1470 break;
1471 case OFPET_FLOW_MOD_FAILED:
1472 OFFlowModFailedCode fmfc =
1473 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1474 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1475 break;
1476 case OFPET_PORT_MOD_FAILED:
1477 OFPortModFailedCode pmfc =
1478 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1479 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1480 break;
1481 case OFPET_QUEUE_OP_FAILED:
1482 OFQueueOpFailedCode qofc =
1483 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1484 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1485 break;
1486 default:
1487 break;
1488 }
1489 }
1490
1491 /**
1492 * Add a switch to the active switch list and call the switch listeners.
1493 * This happens either when a switch first connects (and the controller is
1494 * not in the slave role) or when the role of the controller changes from
1495 * slave to master.
1496 * @param sw the switch that has been added
1497 */
1498 // TODO: need to rethink locking and the synchronous switch update.
1499 // We can / should also handle duplicate DPIDs in connectedSwitches
1500 @LogMessageDoc(level="ERROR",
1501 message="New switch added {switch} for already-added switch {switch}",
1502 explanation="A switch with the same DPID as another switch " +
1503 "connected to the controller. This can be caused by " +
1504 "multiple switches configured with the same DPID, or " +
1505 "by a switch reconnected very quickly after " +
1506 "disconnecting.",
1507 recommendation="If this happens repeatedly, it is likely there " +
1508 "are switches with duplicate DPIDs on the network. " +
1509 "Reconfigure the appropriate switches. If it happens " +
1510 "very rarely, then it is likely this is a transient " +
1511 "network problem that can be ignored."
1512 )
1513 protected void addSwitch(IOFSwitch sw) {
1514 // TODO: is it safe to modify the HashMap without holding
1515 // the old switch's lock?
1516 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1517 if (sw == oldSw) {
1518 // Note == for object equality, not .equals for value
1519 log.info("New add switch for pre-existing switch {}", sw);
1520 return;
1521 }
1522
1523 if (oldSw != null) {
1524 oldSw.getListenerWriteLock().lock();
1525 try {
1526 log.error("New switch added {} for already-added switch {}",
1527 sw, oldSw);
1528 // Set the connected flag to false to suppress calling
1529 // the listeners for this switch in processOFMessage
1530 oldSw.setConnected(false);
1531
1532 oldSw.cancelAllStatisticsReplies();
1533
1534 updateInactiveSwitchInfo(oldSw);
1535
1536 // we need to clean out old switch state definitively
1537 // before adding the new switch
1538 // FIXME: It seems not completely kosher to call the
1539 // switch listeners here. I thought one of the points of
1540 // having the asynchronous switch update mechanism was so
1541 // the addedSwitch and removedSwitch were always called
1542 // from a single thread to simplify concurrency issues
1543 // for the listener.
1544 if (switchListeners != null) {
1545 for (IOFSwitchListener listener : switchListeners) {
1546 listener.removedSwitch(oldSw);
1547 }
1548 }
1549 // will eventually trigger a removeSwitch(), which will cause
1550 // a "Not removing Switch ... already removed debug message.
1551 // TODO: Figure out a way to handle this that avoids the
1552 // spurious debug message.
1553 oldSw.getChannel().close();
1554 }
1555 finally {
1556 oldSw.getListenerWriteLock().unlock();
1557 }
1558 }
1559
1560 updateActiveSwitchInfo(sw);
Pankaj Berdef8ad2852013-02-27 17:06:14 -08001561 if (registryService.hasControl(sw.getId())) {
1562 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
1563 for (OFPhysicalPort port: sw.getPorts()) {
1564 swStore.addPort(sw.getStringId(), port);
1565 }
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001566 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001567 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1568 try {
1569 this.updates.put(update);
1570 } catch (InterruptedException e) {
1571 log.error("Failure adding update to queue", e);
1572 }
1573 }
1574
1575 /**
1576 * Remove a switch from the active switch list and call the switch listeners.
1577 * This happens either when the switch is disconnected or when the
1578 * controller's role for the switch changes from master to slave.
1579 * @param sw the switch that has been removed
1580 */
1581 protected void removeSwitch(IOFSwitch sw) {
1582 // No need to acquire the listener lock, since
1583 // this method is only called after netty has processed all
1584 // pending messages
1585 log.debug("removeSwitch: {}", sw);
Pankaj Berdefa4d0f72013-03-13 17:59:37 -07001586 //
1587 // Cannot set sw to inactive in network map due to race condition
1588 // Need a cleanup thread to periodically check switches not active in registry
1589 // and acquire control to set to inactive state in network map and release it
1590 //
1591 // if (registryService.hasControl(sw.getId())) {
1592 // swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
1593 // }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001594 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1595 log.debug("Not removing switch {}; already removed", sw);
1596 return;
1597 }
1598 // We cancel all outstanding statistics replies if the switch transition
1599 // from active. In the future we might allow statistics requests
1600 // from slave controllers. Then we need to move this cancelation
1601 // to switch disconnect
1602 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001603
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001604
1605 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1606 // here if role support is enabled. In that case if the switch is being
1607 // removed because we've been switched to being in the slave role, then I think
1608 // it's possible that the new master may have already been promoted to master
1609 // and written out the active switch state to storage. If we now execute
1610 // updateInactiveSwitchInfo we may wipe out all of the state that was
1611 // written out by the new master. Maybe need to revisit how we handle all
1612 // of the switch state that's written to storage.
1613
1614 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001615
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001616 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1617 try {
1618 this.updates.put(update);
1619 } catch (InterruptedException e) {
1620 log.error("Failure adding update to queue", e);
1621 }
1622 }
1623
1624 // ***************
1625 // IFloodlightProvider
1626 // ***************
1627
1628 @Override
1629 public synchronized void addOFMessageListener(OFType type,
1630 IOFMessageListener listener) {
1631 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1632 messageListeners.get(type);
1633 if (ldd == null) {
1634 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1635 messageListeners.put(type, ldd);
1636 }
1637 ldd.addListener(type, listener);
1638 }
1639
1640 @Override
1641 public synchronized void removeOFMessageListener(OFType type,
1642 IOFMessageListener listener) {
1643 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1644 messageListeners.get(type);
1645 if (ldd != null) {
1646 ldd.removeListener(listener);
1647 }
1648 }
1649
1650 private void logListeners() {
1651 for (Map.Entry<OFType,
1652 ListenerDispatcher<OFType,
1653 IOFMessageListener>> entry
1654 : messageListeners.entrySet()) {
1655
1656 OFType type = entry.getKey();
1657 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1658 entry.getValue();
1659
1660 StringBuffer sb = new StringBuffer();
1661 sb.append("OFListeners for ");
1662 sb.append(type);
1663 sb.append(": ");
1664 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1665 sb.append(l.getName());
1666 sb.append(",");
1667 }
1668 log.debug(sb.toString());
1669 }
1670 }
1671
1672 public void removeOFMessageListeners(OFType type) {
1673 messageListeners.remove(type);
1674 }
1675
1676 @Override
1677 public Map<Long, IOFSwitch> getSwitches() {
1678 return Collections.unmodifiableMap(this.activeSwitches);
1679 }
1680
1681 @Override
1682 public void addOFSwitchListener(IOFSwitchListener listener) {
1683 this.switchListeners.add(listener);
1684 }
1685
1686 @Override
1687 public void removeOFSwitchListener(IOFSwitchListener listener) {
1688 this.switchListeners.remove(listener);
1689 }
1690
1691 @Override
1692 public Map<OFType, List<IOFMessageListener>> getListeners() {
1693 Map<OFType, List<IOFMessageListener>> lers =
1694 new HashMap<OFType, List<IOFMessageListener>>();
1695 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1696 messageListeners.entrySet()) {
1697 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1698 }
1699 return Collections.unmodifiableMap(lers);
1700 }
1701
1702 @Override
1703 @LogMessageDocs({
1704 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1705 "a null switch",
1706 explanation="Failed to process a message because the switch " +
1707 " is no longer connected."),
1708 @LogMessageDoc(level="ERROR",
1709 message="Error reinjecting OFMessage on switch {switch}",
1710 explanation="An I/O error occured while attempting to " +
1711 "process an OpenFlow message",
1712 recommendation=LogMessageDoc.CHECK_SWITCH)
1713 })
1714 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1715 FloodlightContext bc) {
1716 if (sw == null) {
1717 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1718 return false;
1719 }
1720
1721 // FIXME: Do we need to be able to inject messages to switches
1722 // where we're the slave controller (i.e. they're connected but
1723 // not active)?
1724 // FIXME: Don't we need synchronization logic here so we're holding
1725 // the listener read lock when we call handleMessage? After some
1726 // discussions it sounds like the right thing to do here would be to
1727 // inject the message as a netty upstream channel event so it goes
1728 // through the normal netty event processing, including being
1729 // handled
1730 if (!activeSwitches.containsKey(sw.getId())) return false;
1731
1732 try {
1733 // Pass Floodlight context to the handleMessages()
1734 handleMessage(sw, msg, bc);
1735 } catch (IOException e) {
1736 log.error("Error reinjecting OFMessage on switch {}",
1737 HexString.toHexString(sw.getId()));
1738 return false;
1739 }
1740 return true;
1741 }
1742
1743 @Override
1744 @LogMessageDoc(message="Calling System.exit",
1745 explanation="The controller is terminating")
1746 public synchronized void terminate() {
1747 log.info("Calling System.exit");
1748 System.exit(1);
1749 }
1750
1751 @Override
1752 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1753 // call the overloaded version with floodlight context set to null
1754 return injectOfMessage(sw, msg, null);
1755 }
1756
1757 @Override
1758 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1759 FloodlightContext bc) {
1760 if (log.isTraceEnabled()) {
1761 String str = OFMessage.getDataAsString(sw, m, bc);
1762 log.trace("{}", str);
1763 }
1764
1765 List<IOFMessageListener> listeners = null;
1766 if (messageListeners.containsKey(m.getType())) {
1767 listeners =
1768 messageListeners.get(m.getType()).getOrderedListeners();
1769 }
1770
1771 if (listeners != null) {
1772 for (IOFMessageListener listener : listeners) {
1773 if (listener instanceof IOFSwitchFilter) {
1774 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1775 continue;
1776 }
1777 }
1778 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1779 break;
1780 }
1781 }
1782 }
1783 }
1784
1785 @Override
1786 public BasicFactory getOFMessageFactory() {
1787 return factory;
1788 }
1789
1790 @Override
1791 public String getControllerId() {
1792 return controllerId;
1793 }
1794
1795 // **************
1796 // Initialization
1797 // **************
1798
1799 protected void updateAllInactiveSwitchInfo() {
1800 if (role == Role.SLAVE) {
1801 return;
1802 }
1803 String controllerId = getControllerId();
1804 String[] switchColumns = { SWITCH_DATAPATH_ID,
1805 SWITCH_CONTROLLER_ID,
1806 SWITCH_ACTIVE };
1807 String[] portColumns = { PORT_ID, PORT_SWITCH };
1808 IResultSet switchResultSet = null;
1809 try {
1810 OperatorPredicate op =
1811 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1812 OperatorPredicate.Operator.EQ,
1813 controllerId);
1814 switchResultSet =
1815 storageSource.executeQuery(SWITCH_TABLE_NAME,
1816 switchColumns,
1817 op, null);
1818 while (switchResultSet.next()) {
1819 IResultSet portResultSet = null;
1820 try {
1821 String datapathId =
1822 switchResultSet.getString(SWITCH_DATAPATH_ID);
1823 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1824 op = new OperatorPredicate(PORT_SWITCH,
1825 OperatorPredicate.Operator.EQ,
1826 datapathId);
1827 portResultSet =
1828 storageSource.executeQuery(PORT_TABLE_NAME,
1829 portColumns,
1830 op, null);
1831 while (portResultSet.next()) {
1832 portResultSet.deleteRow();
1833 }
1834 portResultSet.save();
1835 }
1836 finally {
1837 if (portResultSet != null)
1838 portResultSet.close();
1839 }
1840 }
1841 switchResultSet.save();
1842 }
1843 finally {
1844 if (switchResultSet != null)
1845 switchResultSet.close();
1846 }
1847 }
1848
1849 protected void updateControllerInfo() {
1850 updateAllInactiveSwitchInfo();
1851
1852 // Write out the controller info to the storage source
1853 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1854 String id = getControllerId();
1855 controllerInfo.put(CONTROLLER_ID, id);
1856 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1857 }
1858
1859 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1860 if (role == Role.SLAVE) {
1861 return;
1862 }
1863 // Obtain the row info for the switch
1864 Map<String, Object> switchInfo = new HashMap<String, Object>();
1865 String datapathIdString = sw.getStringId();
1866 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1867 String controllerId = getControllerId();
1868 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1869 Date connectedSince = sw.getConnectedSince();
1870 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1871 Channel channel = sw.getChannel();
1872 SocketAddress socketAddress = channel.getRemoteAddress();
1873 if (socketAddress != null) {
1874 String socketAddressString = socketAddress.toString();
1875 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1876 if (socketAddress instanceof InetSocketAddress) {
1877 InetSocketAddress inetSocketAddress =
1878 (InetSocketAddress)socketAddress;
1879 InetAddress inetAddress = inetSocketAddress.getAddress();
1880 String ip = inetAddress.getHostAddress();
1881 switchInfo.put(SWITCH_IP, ip);
1882 }
1883 }
1884
1885 // Write out the switch features info
1886 long capabilities = U32.f(sw.getCapabilities());
1887 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1888 long buffers = U32.f(sw.getBuffers());
1889 switchInfo.put(SWITCH_BUFFERS, buffers);
1890 long tables = U32.f(sw.getTables());
1891 switchInfo.put(SWITCH_TABLES, tables);
1892 long actions = U32.f(sw.getActions());
1893 switchInfo.put(SWITCH_ACTIONS, actions);
1894 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1895
1896 // Update the switch
1897 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1898
1899 // Update the ports
1900 for (OFPhysicalPort port: sw.getPorts()) {
1901 updatePortInfo(sw, port);
1902 }
1903 }
1904
1905 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1906 if (role == Role.SLAVE) {
1907 return;
1908 }
1909 log.debug("Update DB with inactiveSW {}", sw);
1910 // Update the controller info in the storage source to be inactive
1911 Map<String, Object> switchInfo = new HashMap<String, Object>();
1912 String datapathIdString = sw.getStringId();
1913 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1914 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1915 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1916 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1917 }
1918
1919 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1920 if (role == Role.SLAVE) {
1921 return;
1922 }
1923 String datapathIdString = sw.getStringId();
1924 Map<String, Object> portInfo = new HashMap<String, Object>();
1925 int portNumber = U16.f(port.getPortNumber());
1926 String id = datapathIdString + "|" + portNumber;
1927 portInfo.put(PORT_ID, id);
1928 portInfo.put(PORT_SWITCH, datapathIdString);
1929 portInfo.put(PORT_NUMBER, portNumber);
1930 byte[] hardwareAddress = port.getHardwareAddress();
1931 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1932 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1933 String name = port.getName();
1934 portInfo.put(PORT_NAME, name);
1935 long config = U32.f(port.getConfig());
1936 portInfo.put(PORT_CONFIG, config);
1937 long state = U32.f(port.getState());
1938 portInfo.put(PORT_STATE, state);
1939 long currentFeatures = U32.f(port.getCurrentFeatures());
1940 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1941 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1942 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1943 long supportedFeatures = U32.f(port.getSupportedFeatures());
1944 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1945 long peerFeatures = U32.f(port.getPeerFeatures());
1946 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1947 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1948 }
1949
1950 /**
1951 * Read switch port data from storage and write it into a switch object
1952 * @param sw the switch to update
1953 */
1954 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1955 OperatorPredicate op =
1956 new OperatorPredicate(PORT_SWITCH,
1957 OperatorPredicate.Operator.EQ,
1958 sw.getStringId());
1959 IResultSet portResultSet =
1960 storageSource.executeQuery(PORT_TABLE_NAME,
1961 null, op, null);
1962 //Map<Short, OFPhysicalPort> oldports =
1963 // new HashMap<Short, OFPhysicalPort>();
1964 //oldports.putAll(sw.getPorts());
1965
1966 while (portResultSet.next()) {
1967 try {
1968 OFPhysicalPort p = new OFPhysicalPort();
1969 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1970 p.setName(portResultSet.getString(PORT_NAME));
1971 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1972 p.setState((int)portResultSet.getLong(PORT_STATE));
1973 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1974 p.setHardwareAddress(HexString.fromHexString(portMac));
1975 p.setCurrentFeatures((int)portResultSet.
1976 getLong(PORT_CURRENT_FEATURES));
1977 p.setAdvertisedFeatures((int)portResultSet.
1978 getLong(PORT_ADVERTISED_FEATURES));
1979 p.setSupportedFeatures((int)portResultSet.
1980 getLong(PORT_SUPPORTED_FEATURES));
1981 p.setPeerFeatures((int)portResultSet.
1982 getLong(PORT_PEER_FEATURES));
1983 //oldports.remove(Short.valueOf(p.getPortNumber()));
1984 sw.setPort(p);
1985 } catch (NullPointerException e) {
1986 // ignore
1987 }
1988 }
1989 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1990 try {
1991 this.updates.put(update);
1992 } catch (InterruptedException e) {
1993 log.error("Failure adding update to queue", e);
1994 }
1995 }
1996
1997 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1998 if (role == Role.SLAVE) {
1999 return;
2000 }
2001 String datapathIdString = sw.getStringId();
2002 String id = datapathIdString + "|" + portNumber;
2003 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2004 }
2005
2006 /**
2007 * Sets the initial role based on properties in the config params.
2008 * It looks for two different properties.
2009 * If the "role" property is specified then the value should be
2010 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2011 * controller is set to the specified value. If the "role" property
2012 * is not specified then it looks next for the "role.path" property.
2013 * In this case the value should be the path to a property file in
2014 * the file system that contains a property called "floodlight.role"
2015 * which can be one of the values listed above for the "role" property.
2016 * The idea behind the "role.path" mechanism is that you have some
2017 * separate heartbeat and master controller election algorithm that
2018 * determines the role of the controller. When a role transition happens,
2019 * it updates the current role in the file specified by the "role.path"
2020 * file. Then if floodlight restarts for some reason it can get the
2021 * correct current role of the controller from the file.
2022 * @param configParams The config params for the FloodlightProvider service
2023 * @return A valid role if role information is specified in the
2024 * config params, otherwise null
2025 */
2026 @LogMessageDocs({
2027 @LogMessageDoc(message="Controller role set to {role}",
2028 explanation="Setting the initial HA role to "),
2029 @LogMessageDoc(level="ERROR",
2030 message="Invalid current role value: {role}",
2031 explanation="An invalid HA role value was read from the " +
2032 "properties file",
2033 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2034 })
2035 protected Role getInitialRole(Map<String, String> configParams) {
2036 Role role = null;
2037 String roleString = configParams.get("role");
2038 if (roleString == null) {
2039 String rolePath = configParams.get("rolepath");
2040 if (rolePath != null) {
2041 Properties properties = new Properties();
2042 try {
2043 properties.load(new FileInputStream(rolePath));
2044 roleString = properties.getProperty("floodlight.role");
2045 }
2046 catch (IOException exc) {
2047 // Don't treat it as an error if the file specified by the
2048 // rolepath property doesn't exist. This lets us enable the
2049 // HA mechanism by just creating/setting the floodlight.role
2050 // property in that file without having to modify the
2051 // floodlight properties.
2052 }
2053 }
2054 }
2055
2056 if (roleString != null) {
2057 // Canonicalize the string to the form used for the enum constants
2058 roleString = roleString.trim().toUpperCase();
2059 try {
2060 role = Role.valueOf(roleString);
2061 }
2062 catch (IllegalArgumentException exc) {
2063 log.error("Invalid current role value: {}", roleString);
2064 }
2065 }
2066
2067 log.info("Controller role set to {}", role);
2068
2069 return role;
2070 }
2071
2072 /**
2073 * Tell controller that we're ready to accept switches loop
2074 * @throws IOException
2075 */
2076 @LogMessageDocs({
2077 @LogMessageDoc(message="Listening for switch connections on {address}",
2078 explanation="The controller is ready and listening for new" +
2079 " switch connections"),
2080 @LogMessageDoc(message="Storage exception in controller " +
2081 "updates loop; terminating process",
2082 explanation=ERROR_DATABASE,
2083 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2084 @LogMessageDoc(level="ERROR",
2085 message="Exception in controller updates loop",
2086 explanation="Failed to dispatch controller event",
2087 recommendation=LogMessageDoc.GENERIC_ACTION)
2088 })
2089 public void run() {
2090 if (log.isDebugEnabled()) {
2091 logListeners();
2092 }
2093
2094 try {
2095 final ServerBootstrap bootstrap = createServerBootStrap();
2096
2097 bootstrap.setOption("reuseAddr", true);
2098 bootstrap.setOption("child.keepAlive", true);
2099 bootstrap.setOption("child.tcpNoDelay", true);
2100 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2101
2102 ChannelPipelineFactory pfact =
2103 new OpenflowPipelineFactory(this, null);
2104 bootstrap.setPipelineFactory(pfact);
2105 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2106 final ChannelGroup cg = new DefaultChannelGroup();
2107 cg.add(bootstrap.bind(sa));
2108
2109 log.info("Listening for switch connections on {}", sa);
2110 } catch (Exception e) {
2111 throw new RuntimeException(e);
2112 }
2113
2114 // main loop
2115 while (true) {
2116 try {
2117 IUpdate update = updates.take();
2118 update.dispatch();
2119 } catch (InterruptedException e) {
2120 return;
2121 } catch (StorageException e) {
2122 log.error("Storage exception in controller " +
2123 "updates loop; terminating process", e);
2124 return;
2125 } catch (Exception e) {
2126 log.error("Exception in controller updates loop", e);
2127 }
2128 }
2129 }
2130
2131 private ServerBootstrap createServerBootStrap() {
2132 if (workerThreads == 0) {
2133 return new ServerBootstrap(
2134 new NioServerSocketChannelFactory(
2135 Executors.newCachedThreadPool(),
2136 Executors.newCachedThreadPool()));
2137 } else {
2138 return new ServerBootstrap(
2139 new NioServerSocketChannelFactory(
2140 Executors.newCachedThreadPool(),
2141 Executors.newCachedThreadPool(), workerThreads));
2142 }
2143 }
2144
2145 public void setConfigParams(Map<String, String> configParams) {
2146 String ofPort = configParams.get("openflowport");
2147 if (ofPort != null) {
2148 this.openFlowPort = Integer.parseInt(ofPort);
2149 }
2150 log.debug("OpenFlow port set to {}", this.openFlowPort);
2151 String threads = configParams.get("workerthreads");
2152 if (threads != null) {
2153 this.workerThreads = Integer.parseInt(threads);
2154 }
2155 log.debug("Number of worker threads set to {}", this.workerThreads);
2156 String controllerId = configParams.get("controllerid");
2157 if (controllerId != null) {
2158 this.controllerId = controllerId;
2159 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002160 else {
2161 //Try to get the hostname of the machine and use that for controller ID
2162 try {
2163 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2164 this.controllerId = hostname;
2165 } catch (UnknownHostException e) {
2166 // Can't get hostname, we'll just use the default
2167 }
2168 }
2169
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002170 log.debug("ControllerId set to {}", this.controllerId);
2171 }
2172
2173 private void initVendorMessages() {
2174 // Configure openflowj to be able to parse the role request/reply
2175 // vendor messages.
2176 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2177 OFNiciraVendorData.NX_VENDOR_ID, 4);
2178 OFVendorId.registerVendorId(niciraVendorId);
2179 OFBasicVendorDataType roleRequestVendorData =
2180 new OFBasicVendorDataType(
2181 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2182 OFRoleRequestVendorData.getInstantiable());
2183 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2184 OFBasicVendorDataType roleReplyVendorData =
2185 new OFBasicVendorDataType(
2186 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2187 OFRoleReplyVendorData.getInstantiable());
2188 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2189 }
2190
2191 /**
2192 * Initialize internal data structures
2193 */
2194 public void init(Map<String, String> configParams) {
2195 // These data structures are initialized here because other
2196 // module's startUp() might be called before ours
2197 this.messageListeners =
2198 new ConcurrentHashMap<OFType,
2199 ListenerDispatcher<OFType,
2200 IOFMessageListener>>();
2201 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2202 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2203 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2204 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2205 this.controllerNodeIPsCache = new HashMap<String, String>();
2206 this.updates = new LinkedBlockingQueue<IUpdate>();
2207 this.factory = new BasicFactory();
2208 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2209 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002210 //this.role = getInitialRole(configParams);
2211 //Set the controller's role to MASTER so it always tries to do role requests.
2212 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002213 this.roleChanger = new RoleChanger();
2214 initVendorMessages();
2215 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002216 }
2217
2218 /**
2219 * Startup all of the controller's components
2220 */
2221 @LogMessageDoc(message="Waiting for storage source",
2222 explanation="The system database is not yet ready",
2223 recommendation="If this message persists, this indicates " +
2224 "that the system database has failed to start. " +
2225 LogMessageDoc.CHECK_CONTROLLER)
2226 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002227 try {
2228 registryService.registerController(controllerId);
2229 } catch (RegistryException e2) {
2230 log.warn("Registry service error: {}", e2.getMessage());
2231 }
2232
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002233 // Create the table names we use
2234 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2235 storageSource.createTable(SWITCH_TABLE_NAME, null);
2236 storageSource.createTable(PORT_TABLE_NAME, null);
2237 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2238 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2239 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2240 CONTROLLER_ID);
2241 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2242 SWITCH_DATAPATH_ID);
2243 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2244 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2245 CONTROLLER_INTERFACE_ID);
2246 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2247
2248 while (true) {
2249 try {
2250 updateControllerInfo();
2251 break;
2252 }
2253 catch (StorageException e) {
2254 log.info("Waiting for storage source");
2255 try {
2256 Thread.sleep(1000);
2257 } catch (InterruptedException e1) {
2258 }
2259 }
2260 }
2261
2262 // Add our REST API
2263 restApi.addRestletRoutable(new CoreWebRoutable());
2264 }
2265
2266 @Override
2267 public void addInfoProvider(String type, IInfoProvider provider) {
2268 if (!providerMap.containsKey(type)) {
2269 providerMap.put(type, new ArrayList<IInfoProvider>());
2270 }
2271 providerMap.get(type).add(provider);
2272 }
2273
2274 @Override
2275 public void removeInfoProvider(String type, IInfoProvider provider) {
2276 if (!providerMap.containsKey(type)) {
2277 log.debug("Provider type {} doesn't exist.", type);
2278 return;
2279 }
2280
2281 providerMap.get(type).remove(provider);
2282 }
2283
2284 public Map<String, Object> getControllerInfo(String type) {
2285 if (!providerMap.containsKey(type)) return null;
2286
2287 Map<String, Object> result = new LinkedHashMap<String, Object>();
2288 for (IInfoProvider provider : providerMap.get(type)) {
2289 result.putAll(provider.getInfo(type));
2290 }
2291
2292 return result;
2293 }
2294
2295 @Override
2296 public void addHAListener(IHAListener listener) {
2297 this.haListeners.add(listener);
2298 }
2299
2300 @Override
2301 public void removeHAListener(IHAListener listener) {
2302 this.haListeners.remove(listener);
2303 }
2304
2305
2306 /**
2307 * Handle changes to the controller nodes IPs and dispatch update.
2308 */
2309 @SuppressWarnings("unchecked")
2310 protected void handleControllerNodeIPChanges() {
2311 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2312 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2313 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2314 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2315 CONTROLLER_INTERFACE_TYPE,
2316 CONTROLLER_INTERFACE_NUMBER,
2317 CONTROLLER_INTERFACE_DISCOVERED_IP };
2318 synchronized(controllerNodeIPsCache) {
2319 // We currently assume that interface Ethernet0 is the relevant
2320 // controller interface. Might change.
2321 // We could (should?) implement this using
2322 // predicates, but creating the individual and compound predicate
2323 // seems more overhead then just checking every row. Particularly,
2324 // since the number of rows is small and changes infrequent
2325 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2326 colNames,null, null);
2327 while (res.next()) {
2328 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2329 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2330 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2331 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2332 String curIP = controllerNodeIPsCache.get(controllerID);
2333
2334 curControllerNodeIPs.put(controllerID, discoveredIP);
2335 if (curIP == null) {
2336 // new controller node IP
2337 addedControllerNodeIPs.put(controllerID, discoveredIP);
2338 }
2339 else if (curIP != discoveredIP) {
2340 // IP changed
2341 removedControllerNodeIPs.put(controllerID, curIP);
2342 addedControllerNodeIPs.put(controllerID, discoveredIP);
2343 }
2344 }
2345 }
2346 // Now figure out if rows have been deleted. We can't use the
2347 // rowKeys from rowsDeleted directly, since the tables primary
2348 // key is a compound that we can't disassemble
2349 Set<String> curEntries = curControllerNodeIPs.keySet();
2350 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2351 removedEntries.removeAll(curEntries);
2352 for (String removedControllerID : removedEntries)
2353 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2354 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2355 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2356 curControllerNodeIPs, addedControllerNodeIPs,
2357 removedControllerNodeIPs);
2358 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2359 try {
2360 this.updates.put(update);
2361 } catch (InterruptedException e) {
2362 log.error("Failure adding update to queue", e);
2363 }
2364 }
2365 }
2366 }
2367
2368 @Override
2369 public Map<String, String> getControllerNodeIPs() {
2370 // We return a copy of the mapping so we can guarantee that
2371 // the mapping return is the same as one that will be (or was)
2372 // dispatched to IHAListeners
2373 HashMap<String,String> retval = new HashMap<String,String>();
2374 synchronized(controllerNodeIPsCache) {
2375 retval.putAll(controllerNodeIPsCache);
2376 }
2377 return retval;
2378 }
2379
2380 @Override
2381 public void rowsModified(String tableName, Set<Object> rowKeys) {
2382 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2383 handleControllerNodeIPChanges();
2384 }
2385
2386 }
2387
2388 @Override
2389 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2390 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2391 handleControllerNodeIPChanges();
2392 }
2393 }
2394
2395 @Override
2396 public long getSystemStartTime() {
2397 return (this.systemStartTime);
2398 }
2399
2400 @Override
2401 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2402 this.alwaysClearFlowsOnSwAdd = value;
2403 }
2404
2405 public boolean getAlwaysClearFlowsOnSwAdd() {
2406 return this.alwaysClearFlowsOnSwAdd;
2407 }
2408}