Umesh Krishnaswamy | 345ee99 | 2012-12-13 20:29:48 -0800 | [diff] [blame] | 1 | package net.floodlightcontroller.core.internal; |
| 2 | |
| 3 | import java.io.IOException; |
| 4 | import java.util.ArrayList; |
| 5 | import java.util.Collection; |
| 6 | import java.util.Iterator; |
| 7 | import java.util.concurrent.DelayQueue; |
| 8 | import java.util.concurrent.Delayed; |
| 9 | import java.util.concurrent.TimeUnit; |
| 10 | |
| 11 | import org.slf4j.Logger; |
| 12 | import org.slf4j.LoggerFactory; |
| 13 | |
| 14 | import net.floodlightcontroller.core.IFloodlightProviderService.Role; |
| 15 | import net.floodlightcontroller.core.IOFSwitch; |
| 16 | import net.floodlightcontroller.core.annotations.LogMessageDoc; |
| 17 | |
| 18 | /** |
| 19 | * This class handles sending of RoleRequest messages to all connected switches. |
| 20 | * |
| 21 | * Handling Role Requests is tricky. Roles are hard state on the switch and |
| 22 | * we can't query it so we need to make sure that we have consistent states |
| 23 | * on the switches. Whenever we send a role request to the set of connected |
| 24 | * switches we need to make sure that we've sent the request to all of them |
| 25 | * before we process the next change request. If a new switch connects, we |
| 26 | * need to send it the current role and need to make sure that the current |
| 27 | * role doesn't change while we are doing it. We achieve this by synchronizing |
| 28 | * all these actions on Controller.roleChanger |
| 29 | * On the receive side: we need to make sure that we receive a reply for each |
| 30 | * request we send and that the reply is consistent with the request we sent. |
| 31 | * We'd also like to send the role request to the switch asynchronously in a |
| 32 | * separate thread so we don't block the REST API or other callers. |
| 33 | * |
| 34 | * There are potential ways to relax these synchronization requirements: |
| 35 | * - "Generation ID" for each role request. However, this would be most useful |
| 36 | * if it were global for the whole cluster |
| 37 | * - Regularly resend the controller's current role. Don't know whether this |
| 38 | * might have adverse effects on the switch. |
| 39 | * |
| 40 | * Caveats: |
| 41 | * - No way to know if another controller (not in our controller cluster) |
| 42 | * sends MASTER requests to connected switches. Then we would drop to |
| 43 | * slave role without knowing it. Could regularly resend the current role. |
| 44 | * Ideally the switch would notify us if it demoted us. What happens if |
| 45 | * the other controller also regularly resends the same role request? |
| 46 | * Or if the health check determines that |
| 47 | * a controller is dead but the controller is still talking to switches (maybe |
| 48 | * just its health check failed) and resending the master role request.... |
| 49 | * We could try to detect if a switch demoted us to slave even if we think |
| 50 | * we are master (error messages on packet outs, e.g., when sending LLDPs) |
| 51 | * |
| 52 | * |
| 53 | * The general model of Role Request handling is as follows: |
| 54 | * |
| 55 | * - All role request messages are handled by this class. Class Controller |
| 56 | * submits a role change request and the request gets queued. submitRequest |
| 57 | * takes a Collection of switches to which to send the request. We make a copy |
| 58 | * of this list. |
| 59 | * - A thread takes these change requests from the queue and sends them to |
| 60 | * all the switches (using our copy of the switch list). |
| 61 | * - The OFSwitchImpl sends the request over the wire and puts the request |
| 62 | * into a queue of pending request (storing xid and role). We start a timeout |
| 63 | * to make sure we eventually receive a reply from the switch. We use a single |
| 64 | * timeout for each request submitted using submitRequest() |
| 65 | * - After the timeout triggers we go over the list of switches again and |
| 66 | * check that a response has been received (by checking the head of the |
| 67 | * OFSwitchImpl's queue of pending requests) |
| 68 | * - We handle requests and timeouts in the same thread. We use a priority queue |
| 69 | * to schedule them so we are guaranteed that they are processed in |
| 70 | * the same order as they are submitted. If a request times out we drop |
| 71 | * the connection to this switch. |
| 72 | * - Since we decouple submission of role change requests and actually sending |
| 73 | * them we cannot check a received role reply against the controller's current |
| 74 | * role because the controller's current role could have changed again. |
| 75 | * - Receiving Role Reply messages is handled by OFChannelHandler and |
| 76 | * OFSwitchImpl directly. The OFSwitchImpl checks if the received request |
| 77 | * is as expected (xid and role match the head of the pending queue in |
| 78 | * OFSwitchImpl). If so |
| 79 | * the switch updates its role. Otherwise the connection is dropped. If this |
| 80 | * is the first reply, the SWITCH_SUPPORTS_NX_ROLE attribute is set. |
| 81 | * Next, we call addSwitch(), removeSwitch() to update the list of active |
| 82 | * switches if appropriate. |
| 83 | * - If we receive an Error indicating that roles are not supported by the |
| 84 | * switch, we set the SWITCH_SUPPORTS_NX_ROLE to false. We keep the |
| 85 | * switch connection alive while in MASTER and EQUAL role. |
| 86 | * (TODO: is this the right behavior for EQUAL??). If the role changes to |
| 87 | * SLAVE the switch connection is dropped (remember: only if the switch |
| 88 | * doesn't support role requests) |
| 89 | * The expected behavior is that the switch will probably try to reconnect |
| 90 | * repeatedly (with some sort of exponential backoff), but after a while |
| 91 | * will give-up and move on to the next controller-IP configured on the |
| 92 | * switch. This is the serial failover mechanism from OpenFlow spec v1.0. |
| 93 | * |
| 94 | * New switch connection: |
| 95 | * - Switch handshake is done without sending any role request messages. |
| 96 | * - After handshake completes, switch is added to the list of connected switches |
| 97 | * and we send the first role request message if role |
| 98 | * requests are enabled. If roles are disabled automatically promote switch to |
| 99 | * active switch list and clear FlowTable. |
| 100 | * - When we receive the first reply we proceed as above. In addition, if |
| 101 | * the role request is for MASTER we wipe the flow table. We do not wipe |
| 102 | * the flow table if the switch connected while role supported was disabled |
| 103 | * on the controller. |
| 104 | * |
| 105 | */ |
| 106 | public class RoleChanger { |
| 107 | // FIXME: Upon closer inspection DelayQueue seems to be somewhat broken. |
| 108 | // We are required to implement a compareTo based on getDelay() and |
| 109 | // getDelay() must return the remaining delay, thus it needs to use the |
| 110 | // current time. So x1.compareTo(x1) can never return 0 as some time |
| 111 | // will have passed between evaluating both getDelays(). This is even worse |
| 112 | // if the thread happens to be preempted between calling the getDelay() |
| 113 | // For the time being we enforce a small delay between subsequent |
| 114 | // role request messages and hope that's long enough to not screw up |
| 115 | // ordering. In the long run we might want to use two threads and two queues |
| 116 | // (one for requests, one for timeouts) |
| 117 | // Sigh. |
| 118 | protected DelayQueue<RoleChangeTask> pendingTasks; |
| 119 | protected long lastSubmitTime; |
| 120 | protected Thread workerThread; |
| 121 | protected long timeout; |
| 122 | protected static long DEFAULT_TIMEOUT = 15L*1000*1000*1000L; // 15s |
| 123 | protected static Logger log = LoggerFactory.getLogger(RoleChanger.class); |
| 124 | /** |
| 125 | * A queued task to be handled by the Role changer thread. |
| 126 | */ |
| 127 | protected static class RoleChangeTask implements Delayed { |
| 128 | protected enum Type { |
| 129 | /** This is a request. Dispatch the role update to switches */ |
| 130 | REQUEST, |
| 131 | /** This is a timeout task. Check if all switches have |
| 132 | correctly replied to the previously dispatched role request */ |
| 133 | TIMEOUT |
| 134 | } |
| 135 | // The set of switches to work on |
| 136 | public Collection<OFSwitchImpl> switches; |
| 137 | public Role role; |
| 138 | public Type type; |
| 139 | // the time when the task should run as nanoTime() |
| 140 | public long deadline; |
| 141 | public RoleChangeTask(Collection<OFSwitchImpl> switches, Role role, long deadline) { |
| 142 | this.switches = switches; |
| 143 | this.role = role; |
| 144 | this.type = Type.REQUEST; |
| 145 | this.deadline = deadline; |
| 146 | } |
| 147 | @Override |
| 148 | public int compareTo(Delayed o) { |
| 149 | Long timeRemaining = getDelay(TimeUnit.NANOSECONDS); |
| 150 | return timeRemaining.compareTo(o.getDelay(TimeUnit.NANOSECONDS)); |
| 151 | } |
| 152 | @Override |
| 153 | public long getDelay(TimeUnit tu) { |
| 154 | long timeRemaining = deadline - System.nanoTime(); |
| 155 | return tu.convert(timeRemaining, TimeUnit.NANOSECONDS); |
| 156 | } |
| 157 | } |
| 158 | |
| 159 | @LogMessageDoc(level="ERROR", |
| 160 | message="RoleRequestWorker task had an uncaught exception.", |
| 161 | explanation="An unknown occured while processing an HA " + |
| 162 | "role change event.", |
| 163 | recommendation=LogMessageDoc.GENERIC_ACTION) |
| 164 | protected class RoleRequestWorker extends Thread { |
| 165 | @Override |
| 166 | public void run() { |
| 167 | RoleChangeTask t; |
| 168 | boolean interrupted = false; |
| 169 | log.trace("RoleRequestWorker thread started"); |
| 170 | try { |
| 171 | while (true) { |
| 172 | try { |
| 173 | t = pendingTasks.take(); |
| 174 | } catch (InterruptedException e) { |
| 175 | // see http://www.ibm.com/developerworks/java/library/j-jtp05236/index.html |
| 176 | interrupted = true; |
| 177 | continue; |
| 178 | } |
| 179 | if (t.type == RoleChangeTask.Type.REQUEST) { |
| 180 | sendRoleRequest(t.switches, t.role, t.deadline); |
| 181 | // Queue the timeout |
| 182 | t.type = RoleChangeTask.Type.TIMEOUT; |
| 183 | t.deadline += timeout; |
| 184 | pendingTasks.put(t); |
| 185 | } |
| 186 | else { |
| 187 | verifyRoleReplyReceived(t.switches, t.deadline); |
| 188 | } |
| 189 | } |
| 190 | } |
| 191 | catch (Exception e) { |
| 192 | // Should never get here |
| 193 | log.error("RoleRequestWorker task had an uncaught exception. ", |
| 194 | e); |
| 195 | } |
| 196 | finally { |
| 197 | // Be nice in case we earlier caught InterruptedExecution |
| 198 | if (interrupted) |
| 199 | Thread.currentThread().interrupt(); |
| 200 | } |
| 201 | } // end loop |
| 202 | } |
| 203 | |
| 204 | public RoleChanger() { |
| 205 | this.pendingTasks = new DelayQueue<RoleChangeTask>(); |
| 206 | this.workerThread = new Thread(new RoleRequestWorker()); |
| 207 | this.timeout = DEFAULT_TIMEOUT; |
| 208 | this.workerThread.start(); |
| 209 | } |
| 210 | |
| 211 | |
| 212 | public synchronized void submitRequest(Collection<OFSwitchImpl> switches, Role role) { |
| 213 | long deadline = System.nanoTime(); |
| 214 | // Grrr. stupid DelayQueue. Make sre we have at least 10ms between |
| 215 | // role request messages. |
| 216 | if (deadline - lastSubmitTime < 10 * 1000*1000) |
| 217 | deadline = lastSubmitTime + 10 * 1000*1000; |
| 218 | // make a copy of the list |
| 219 | ArrayList<OFSwitchImpl> switches_copy = new ArrayList<OFSwitchImpl>(switches); |
| 220 | RoleChangeTask req = new RoleChangeTask(switches_copy, role, deadline); |
| 221 | pendingTasks.put(req); |
| 222 | lastSubmitTime = deadline; |
| 223 | } |
| 224 | |
| 225 | /** |
| 226 | * Send a role request message to switches. This checks the capabilities |
| 227 | * of the switch for understanding role request messaging. Currently we only |
| 228 | * support the OVS-style role request message, but once the controller |
| 229 | * supports OF 1.2, this function will also handle sending out the |
| 230 | * OF 1.2-style role request message. |
| 231 | * @param switches the collection of switches to send the request too |
| 232 | * @param role the role to request |
| 233 | */ |
| 234 | @LogMessageDoc(level="WARN", |
| 235 | message="Failed to send role request message " + |
| 236 | "to switch {switch}: {message}. Disconnecting", |
| 237 | explanation="An I/O error occurred while attempting to change " + |
| 238 | "the switch HA role.", |
| 239 | recommendation=LogMessageDoc.CHECK_SWITCH) |
| 240 | protected void sendRoleRequest(Collection<OFSwitchImpl> switches, |
| 241 | Role role, long cookie) { |
| 242 | // There are three cases to consider: |
| 243 | // |
| 244 | // 1) If the controller role at the point the switch connected was |
| 245 | // null/disabled, then we never sent the role request probe to the |
| 246 | // switch and therefore never set the SWITCH_SUPPORTS_NX_ROLE |
| 247 | // attribute for the switch, so supportsNxRole is null. In that |
| 248 | // case since we're now enabling role support for the controller |
| 249 | // we should send out the role request probe/update to the switch. |
| 250 | // |
| 251 | // 2) If supportsNxRole == Boolean.TRUE then that means we've already |
| 252 | // sent the role request probe to the switch and it replied with |
| 253 | // a role reply message, so we know it supports role request |
| 254 | // messages. Now we're changing the role and we want to send |
| 255 | // it another role request message to inform it of the new role |
| 256 | // for the controller. |
| 257 | // |
| 258 | // 3) If supportsNxRole == Boolean.FALSE, then that means we sent the |
| 259 | // role request probe to the switch but it responded with an error |
| 260 | // indicating that it didn't understand the role request message. |
| 261 | // In that case we don't want to send it another role request that |
| 262 | // it (still) doesn't understand. But if the new role of the |
| 263 | // controller is SLAVE, then we don't want the switch to remain |
| 264 | // connected to this controller. It might support the older serial |
| 265 | // failover model for HA support, so we want to terminate the |
| 266 | // connection and get it to initiate a connection with another |
| 267 | // controller in its list of controllers. Eventually (hopefully, if |
| 268 | // things are configured correctly) it will walk down its list of |
| 269 | // controllers and connect to the current master controller. |
| 270 | Iterator<OFSwitchImpl> iter = switches.iterator(); |
| 271 | while(iter.hasNext()) { |
| 272 | OFSwitchImpl sw = iter.next(); |
| 273 | try { |
| 274 | Boolean supportsNxRole = (Boolean) |
| 275 | sw.getAttribute(IOFSwitch.SWITCH_SUPPORTS_NX_ROLE); |
| 276 | if ((supportsNxRole == null) || supportsNxRole) { |
| 277 | // Handle cases #1 and #2 |
| 278 | sw.sendNxRoleRequest(role, cookie); |
| 279 | } else { |
| 280 | // Handle case #3 |
| 281 | if (role == Role.SLAVE) { |
| 282 | log.debug("Disconnecting switch {} that doesn't support " + |
| 283 | "role request messages from a controller that went to SLAVE mode"); |
| 284 | // Closing the channel should result in a call to |
| 285 | // channelDisconnect which updates all state |
| 286 | sw.getChannel().close(); |
| 287 | iter.remove(); |
| 288 | } |
| 289 | } |
| 290 | } catch (IOException e) { |
| 291 | log.warn("Failed to send role request message " + |
| 292 | "to switch {}: {}. Disconnecting", |
| 293 | sw, e); |
| 294 | sw.getChannel().close(); |
| 295 | iter.remove(); |
| 296 | } |
| 297 | } |
| 298 | } |
| 299 | |
| 300 | /** |
| 301 | * Verify that switches have received a role reply message we sent earlier |
| 302 | * @param switches the collection of switches to send the request too |
| 303 | * @param cookie the cookie of the request |
| 304 | */ |
| 305 | @LogMessageDoc(level="WARN", |
| 306 | message="Timeout while waiting for role reply from switch {switch}." |
| 307 | + " Disconnecting", |
| 308 | explanation="Timed out waiting for the switch to respond to " + |
| 309 | "a request to change the HA role.", |
| 310 | recommendation=LogMessageDoc.CHECK_SWITCH) |
| 311 | protected void verifyRoleReplyReceived(Collection<OFSwitchImpl> switches, |
| 312 | long cookie) { |
| 313 | for (OFSwitchImpl sw: switches) { |
| 314 | if (sw.checkFirstPendingRoleRequestCookie(cookie)) { |
| 315 | sw.getChannel().close(); |
| 316 | log.warn("Timeout while waiting for role reply from switch {}." |
| 317 | + " Disconnecting", sw); |
| 318 | } |
| 319 | } |
| 320 | } |
| 321 | } |