blob: ae998ece81e044c39ad09cd39c07ab112e122836 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2012, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.IOException;
21import java.net.SocketAddress;
22import java.util.ArrayList;
23import java.util.Collection;
24import java.util.Collections;
25import java.util.Date;
26import java.util.HashMap;
27import java.util.LinkedList;
28import java.util.List;
29import java.util.Map;
30import java.util.concurrent.ConcurrentHashMap;
31import java.util.concurrent.ConcurrentMap;
32import java.util.concurrent.Future;
33import java.util.concurrent.atomic.AtomicInteger;
34import java.util.concurrent.locks.Lock;
35import java.util.concurrent.locks.ReentrantReadWriteLock;
36
37import net.floodlightcontroller.core.FloodlightContext;
38import net.floodlightcontroller.core.IFloodlightProviderService;
39import net.floodlightcontroller.core.IOFMessageListener;
40import net.floodlightcontroller.core.IFloodlightProviderService.Role;
41import net.floodlightcontroller.core.IOFSwitch;
42import net.floodlightcontroller.core.annotations.LogMessageDoc;
43import net.floodlightcontroller.core.annotations.LogMessageDocs;
44import net.floodlightcontroller.core.web.serializers.DPIDSerializer;
45import net.floodlightcontroller.threadpool.IThreadPoolService;
46import net.floodlightcontroller.util.TimedCache;
HIGUCHI Yuta7677a6f2013-06-14 14:13:35 -070047import net.onrc.onos.ofcontroller.core.IOnosRemoteSwitch;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080048
49import org.codehaus.jackson.annotate.JsonIgnore;
50import org.codehaus.jackson.annotate.JsonProperty;
51import org.codehaus.jackson.map.annotate.JsonSerialize;
Pankaj Berde5024ec12013-01-31 17:07:29 -080052import org.codehaus.jackson.map.ser.std.ToStringSerializer;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080053import org.jboss.netty.channel.Channel;
54import org.openflow.protocol.OFFeaturesReply;
55import org.openflow.protocol.OFFeaturesRequest;
56import org.openflow.protocol.OFFlowMod;
57import org.openflow.protocol.OFMatch;
58import org.openflow.protocol.OFMessage;
59import org.openflow.protocol.OFPhysicalPort;
60import org.openflow.protocol.OFPort;
61import org.openflow.protocol.OFType;
62import org.openflow.protocol.OFVendor;
63import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
64import org.openflow.protocol.OFPhysicalPort.OFPortState;
65import org.openflow.protocol.OFStatisticsRequest;
66import org.openflow.protocol.statistics.OFDescriptionStatistics;
67import org.openflow.protocol.statistics.OFStatistics;
68import org.openflow.util.HexString;
69import org.openflow.util.U16;
70import org.openflow.vendor.nicira.OFNiciraVendorData;
71import org.openflow.vendor.nicira.OFRoleRequestVendorData;
72import org.openflow.vendor.nicira.OFRoleVendorData;
73import org.slf4j.Logger;
74import org.slf4j.LoggerFactory;
75
76/**
77 * This is the internal representation of an openflow switch.
78 */
HIGUCHI Yuta7677a6f2013-06-14 14:13:35 -070079public class OFSwitchImpl implements IOFSwitch, IOnosRemoteSwitch {
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080080 // TODO: should we really do logging in the class or should we throw
81 // exception that can then be handled by callers?
82 protected static Logger log = LoggerFactory.getLogger(OFSwitchImpl.class);
83
84 private static final String HA_CHECK_SWITCH =
85 "Check the health of the indicated switch. If the problem " +
86 "persists or occurs repeatedly, it likely indicates a defect " +
87 "in the switch HA implementation.";
88
89 protected ConcurrentMap<Object, Object> attributes;
90 protected IFloodlightProviderService floodlightProvider;
91 protected IThreadPoolService threadPool;
92 protected Date connectedSince;
93 protected String stringId;
94 protected Channel channel;
95 protected AtomicInteger transactionIdSource;
96 // Lock to protect modification of the port maps. We only need to
97 // synchronize on modifications. For read operations we are fine since
98 // we rely on ConcurrentMaps which works for our use case.
99 private Object portLock;
100 // Map port numbers to the appropriate OFPhysicalPort
101 protected ConcurrentHashMap<Short, OFPhysicalPort> portsByNumber;
102 // Map port names to the appropriate OFPhyiscalPort
103 // XXX: The OF spec doesn't specify if port names need to be unique but
104 // according it's always the case in practice.
105 protected ConcurrentHashMap<String, OFPhysicalPort> portsByName;
106 protected Map<Integer,OFStatisticsFuture> statsFutureMap;
107 protected Map<Integer, IOFMessageListener> iofMsgListenersMap;
108 protected Map<Integer,OFFeaturesReplyFuture> featuresFutureMap;
109 protected boolean connected;
110 protected Role role;
111 protected TimedCache<Long> timedCache;
112 protected ReentrantReadWriteLock listenerLock;
113 protected ConcurrentMap<Short, Long> portBroadcastCacheHitMap;
114 /**
115 * When sending a role request message, the role request is added
116 * to this queue. If a role reply is received this queue is checked to
117 * verify that the reply matches the expected reply. We require in order
118 * delivery of replies. That's why we use a Queue.
119 * The RoleChanger uses a timeout to ensure we receive a timely reply.
120 *
121 * Need to synchronize on this instance if a request is sent, received,
122 * checked.
123 */
124 protected LinkedList<PendingRoleRequestEntry> pendingRoleRequests;
125
126 /* Switch features from initial featuresReply */
127 protected int capabilities;
128 protected int buffers;
129 protected int actions;
130 protected byte tables;
131 protected long datapathId;
132
133 public static IOFSwitchFeatures switchFeatures;
134 protected static final ThreadLocal<Map<OFSwitchImpl,List<OFMessage>>> local_msg_buffer =
135 new ThreadLocal<Map<OFSwitchImpl,List<OFMessage>>>() {
136 @Override
137 protected Map<OFSwitchImpl,List<OFMessage>> initialValue() {
138 return new HashMap<OFSwitchImpl,List<OFMessage>>();
139 }
140 };
141
142 // for managing our map sizes
143 protected static final int MAX_MACS_PER_SWITCH = 1000;
144
145 protected static class PendingRoleRequestEntry {
146 protected int xid;
147 protected Role role;
148 // cookie is used to identify the role "generation". roleChanger uses
149 protected long cookie;
150 public PendingRoleRequestEntry(int xid, Role role, long cookie) {
151 this.xid = xid;
152 this.role = role;
153 this.cookie = cookie;
154 }
155 }
156
157 public OFSwitchImpl() {
158 this.stringId = null;
159 this.attributes = new ConcurrentHashMap<Object, Object>();
160 this.connectedSince = new Date();
161 this.transactionIdSource = new AtomicInteger();
162 this.portLock = new Object();
163 this.portsByNumber = new ConcurrentHashMap<Short, OFPhysicalPort>();
164 this.portsByName = new ConcurrentHashMap<String, OFPhysicalPort>();
165 this.connected = true;
166 this.statsFutureMap = new ConcurrentHashMap<Integer,OFStatisticsFuture>();
167 this.featuresFutureMap = new ConcurrentHashMap<Integer,OFFeaturesReplyFuture>();
168 this.iofMsgListenersMap = new ConcurrentHashMap<Integer,IOFMessageListener>();
169 this.role = null;
170 this.timedCache = new TimedCache<Long>(100, 5*1000 ); // 5 seconds interval
171 this.listenerLock = new ReentrantReadWriteLock();
172 this.portBroadcastCacheHitMap = new ConcurrentHashMap<Short, Long>();
173 this.pendingRoleRequests = new LinkedList<OFSwitchImpl.PendingRoleRequestEntry>();
174
175 // Defaults properties for an ideal switch
176 this.setAttribute(PROP_FASTWILDCARDS, OFMatch.OFPFW_ALL);
177 this.setAttribute(PROP_SUPPORTS_OFPP_FLOOD, new Boolean(true));
178 this.setAttribute(PROP_SUPPORTS_OFPP_TABLE, new Boolean(true));
179 }
180
181
182 @Override
183 public Object getAttribute(String name) {
184 if (this.attributes.containsKey(name)) {
185 return this.attributes.get(name);
186 }
187 return null;
188 }
189
190 @Override
191 public void setAttribute(String name, Object value) {
192 this.attributes.put(name, value);
193 return;
194 }
195
196 @Override
197 public Object removeAttribute(String name) {
198 return this.attributes.remove(name);
199 }
200
201 @Override
202 public boolean hasAttribute(String name) {
203 return this.attributes.containsKey(name);
204 }
205
206 @Override
207 @JsonIgnore
208 public Channel getChannel() {
209 return this.channel;
210 }
211
212 @JsonIgnore
213 public void setChannel(Channel channel) {
214 this.channel = channel;
215 }
216
217 @Override
218 public void write(OFMessage m, FloodlightContext bc) throws IOException {
219 Map<OFSwitchImpl,List<OFMessage>> msg_buffer_map = local_msg_buffer.get();
220 List<OFMessage> msg_buffer = msg_buffer_map.get(this);
221 if (msg_buffer == null) {
222 msg_buffer = new ArrayList<OFMessage>();
223 msg_buffer_map.put(this, msg_buffer);
224 }
225
226 this.floodlightProvider.handleOutgoingMessage(this, m, bc);
227 msg_buffer.add(m);
228
229 if ((msg_buffer.size() >= Controller.BATCH_MAX_SIZE) ||
230 ((m.getType() != OFType.PACKET_OUT) && (m.getType() != OFType.FLOW_MOD))) {
231 this.write(msg_buffer);
232 msg_buffer.clear();
233 }
234 }
235
236 @Override
237 @LogMessageDoc(level="WARN",
238 message="Sending OF message that modifies switch " +
239 "state while in the slave role: {switch}",
240 explanation="An application has sent a message to a switch " +
241 "that is not valid when the switch is in a slave role",
242 recommendation=LogMessageDoc.REPORT_CONTROLLER_BUG)
243 public void write(List<OFMessage> msglist,
244 FloodlightContext bc) throws IOException {
245 for (OFMessage m : msglist) {
246 if (role == Role.SLAVE) {
247 switch (m.getType()) {
248 case PACKET_OUT:
249 case FLOW_MOD:
250 case PORT_MOD:
251 log.warn("Sending OF message that modifies switch " +
252 "state while in the slave role: {}",
253 m.getType().name());
254 break;
255 default:
256 break;
257 }
258 }
259 this.floodlightProvider.handleOutgoingMessage(this, m, bc);
260 }
261 this.write(msglist);
262 }
263
264 public void write(List<OFMessage> msglist) throws IOException {
265 this.channel.write(msglist);
266 }
267
268 @Override
269 public void disconnectOutputStream() {
270 channel.close();
271 }
HIGUCHI Yuta2995eb52013-06-14 12:50:10 -0700272
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800273 @Override
274 @JsonIgnore
275 public void setFeaturesReply(OFFeaturesReply featuresReply) {
276 synchronized(portLock) {
277 if (stringId == null) {
278 /* ports are updated via port status message, so we
279 * only fill in ports on initial connection.
280 */
281 for (OFPhysicalPort port : featuresReply.getPorts()) {
282 setPort(port);
283 }
284 }
285 this.datapathId = featuresReply.getDatapathId();
286 this.capabilities = featuresReply.getCapabilities();
287 this.buffers = featuresReply.getBuffers();
288 this.actions = featuresReply.getActions();
289 this.tables = featuresReply.getTables();
290 this.stringId = HexString.toHexString(this.datapathId);
291 }
292 }
293
294 @Override
295 @JsonIgnore
296 public Collection<OFPhysicalPort> getEnabledPorts() {
297 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
298 for (OFPhysicalPort port : portsByNumber.values()) {
299 if (portEnabled(port)) {
300 result.add(port);
301 }
302 }
303 return result;
304 }
305
306 @Override
307 @JsonIgnore
308 public Collection<Short> getEnabledPortNumbers() {
309 List<Short> result = new ArrayList<Short>();
310 for (OFPhysicalPort port : portsByNumber.values()) {
311 if (portEnabled(port)) {
312 result.add(port.getPortNumber());
313 }
314 }
315 return result;
316 }
317
318 @Override
319 public OFPhysicalPort getPort(short portNumber) {
320 return portsByNumber.get(portNumber);
321 }
322
323 @Override
324 public OFPhysicalPort getPort(String portName) {
325 return portsByName.get(portName);
326 }
327
328 @Override
329 @JsonIgnore
330 public void setPort(OFPhysicalPort port) {
331 synchronized(portLock) {
332 portsByNumber.put(port.getPortNumber(), port);
333 portsByName.put(port.getName(), port);
334 }
335 }
336
337 @Override
338 @JsonProperty("ports")
339 public Collection<OFPhysicalPort> getPorts() {
340 return Collections.unmodifiableCollection(portsByNumber.values());
341 }
342
343 @Override
344 public void deletePort(short portNumber) {
345 synchronized(portLock) {
346 portsByName.remove(portsByNumber.get(portNumber).getName());
347 portsByNumber.remove(portNumber);
348 }
349 }
350
351 @Override
352 public void deletePort(String portName) {
353 synchronized(portLock) {
354 portsByNumber.remove(portsByName.get(portName).getPortNumber());
355 portsByName.remove(portName);
356 }
357 }
358
359 @Override
360 public boolean portEnabled(short portNumber) {
361 if (portsByNumber.get(portNumber) == null) return false;
362 return portEnabled(portsByNumber.get(portNumber));
363 }
364
365 @Override
366 public boolean portEnabled(String portName) {
367 if (portsByName.get(portName) == null) return false;
368 return portEnabled(portsByName.get(portName));
369 }
370
371 @Override
372 public boolean portEnabled(OFPhysicalPort port) {
373 if (port == null)
374 return false;
375 if ((port.getConfig() & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0)
376 return false;
377 if ((port.getState() & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0)
378 return false;
379 // Port STP state doesn't work with multiple VLANs, so ignore it for now
380 //if ((port.getState() & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue())
381 // return false;
382 return true;
383 }
384
385 @Override
386 @JsonSerialize(using=DPIDSerializer.class)
387 @JsonProperty("dpid")
388 public long getId() {
389 if (this.stringId == null)
390 throw new RuntimeException("Features reply has not yet been set");
391 return this.datapathId;
392 }
393
394 @JsonIgnore
395 @Override
396 public String getStringId() {
397 return stringId;
398 }
399
400 /* (non-Javadoc)
401 * @see java.lang.Object#toString()
402 */
403 @Override
404 public String toString() {
405 return "OFSwitchImpl [" + channel.getRemoteAddress() + " DPID[" + ((stringId != null) ? stringId : "?") + "]]";
406 }
407
408 @Override
409 public ConcurrentMap<Object, Object> getAttributes() {
410 return this.attributes;
411 }
412
413 @Override
414 public Date getConnectedSince() {
415 return connectedSince;
416 }
417
418 @JsonIgnore
419 @Override
420 public int getNextTransactionId() {
421 return this.transactionIdSource.incrementAndGet();
422 }
423
424 @Override
425 public void sendStatsQuery(OFStatisticsRequest request, int xid,
426 IOFMessageListener caller) throws IOException {
427 request.setXid(xid);
428 this.iofMsgListenersMap.put(xid, caller);
429 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
430 msglist.add(request);
431 this.channel.write(msglist);
432 return;
433 }
434
435 @Override
436 public Future<List<OFStatistics>> getStatistics(OFStatisticsRequest request) throws IOException {
437 request.setXid(getNextTransactionId());
438 OFStatisticsFuture future = new OFStatisticsFuture(threadPool, this, request.getXid());
439 this.statsFutureMap.put(request.getXid(), future);
440 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
441 msglist.add(request);
442 this.channel.write(msglist);
443 return future;
444 }
445
446 @Override
447 public void deliverStatisticsReply(OFMessage reply) {
448 OFStatisticsFuture future = this.statsFutureMap.get(reply.getXid());
449 if (future != null) {
450 future.deliverFuture(this, reply);
451 // The future will ultimately unregister itself and call
452 // cancelStatisticsReply
453 return;
454 }
455 /* Transaction id was not found in statsFutureMap.check the other map */
456 IOFMessageListener caller = this.iofMsgListenersMap.get(reply.getXid());
457 if (caller != null) {
458 caller.receive(this, reply, null);
459 }
460 }
461
462 @Override
463 public void cancelStatisticsReply(int transactionId) {
464 if (null == this.statsFutureMap.remove(transactionId)) {
465 this.iofMsgListenersMap.remove(transactionId);
466 }
467 }
468
469 @Override
470 public void cancelAllStatisticsReplies() {
471 /* we don't need to be synchronized here. Even if another thread
472 * modifies the map while we're cleaning up the future will eventuall
473 * timeout */
474 for (OFStatisticsFuture f : statsFutureMap.values()) {
475 f.cancel(true);
476 }
477 statsFutureMap.clear();
478 iofMsgListenersMap.clear();
479 }
480
481
482 /**
483 * @param floodlightProvider the floodlightProvider to set
484 */
485 @JsonIgnore
486 public void setFloodlightProvider(IFloodlightProviderService floodlightProvider) {
487 this.floodlightProvider = floodlightProvider;
488 }
489
490 @JsonIgnore
491 public void setThreadPoolService(IThreadPoolService tp) {
492 this.threadPool = tp;
493 }
494
495 @JsonIgnore
496 @Override
497 public synchronized boolean isConnected() {
498 return connected;
499 }
500
501 @Override
502 @JsonIgnore
503 public synchronized void setConnected(boolean connected) {
504 this.connected = connected;
505 }
506
507 @Override
508 public Role getRole() {
509 return role;
510 }
511
512 @JsonIgnore
513 @Override
514 public boolean isActive() {
515 return (role != Role.SLAVE);
516 }
517
518 @Override
519 @JsonIgnore
520 public void setSwitchProperties(OFDescriptionStatistics description) {
521 if (switchFeatures != null) {
522 switchFeatures.setFromDescription(this, description);
523 }
524 }
525
526 @Override
527 @LogMessageDoc(level="ERROR",
528 message="Failed to clear all flows on switch {switch}",
529 explanation="An I/O error occured while trying to clear " +
530 "flows on the switch.",
531 recommendation=LogMessageDoc.CHECK_SWITCH)
532 public void clearAllFlowMods() {
533 // Delete all pre-existing flows
534 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
535 OFMessage fm = ((OFFlowMod) floodlightProvider.getOFMessageFactory()
536 .getMessage(OFType.FLOW_MOD))
537 .setMatch(match)
538 .setCommand(OFFlowMod.OFPFC_DELETE)
539 .setOutPort(OFPort.OFPP_NONE)
540 .setLength(U16.t(OFFlowMod.MINIMUM_LENGTH));
541 try {
542 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
543 msglist.add(fm);
544 channel.write(msglist);
545 } catch (Exception e) {
546 log.error("Failed to clear all flows on switch " + this, e);
547 }
548 }
549
550 @Override
551 public boolean updateBroadcastCache(Long entry, Short port) {
552 if (timedCache.update(entry)) {
553 Long count = portBroadcastCacheHitMap.putIfAbsent(port, new Long(1));
554 if (count != null) {
555 count++;
556 }
557 return true;
558 } else {
559 return false;
560 }
561 }
562
563 @Override
564 @JsonIgnore
565 public Map<Short, Long> getPortBroadcastHits() {
566 return this.portBroadcastCacheHitMap;
567 }
568
569
570 @Override
571 public void flush() {
572 Map<OFSwitchImpl,List<OFMessage>> msg_buffer_map = local_msg_buffer.get();
573 List<OFMessage> msglist = msg_buffer_map.get(this);
574 if ((msglist != null) && (msglist.size() > 0)) {
575 try {
576 this.write(msglist);
577 } catch (IOException e) {
578 // TODO: log exception
579 e.printStackTrace();
580 }
581 msglist.clear();
582 }
583 }
584
585 public static void flush_all() {
586 Map<OFSwitchImpl,List<OFMessage>> msg_buffer_map = local_msg_buffer.get();
587 for (OFSwitchImpl sw : msg_buffer_map.keySet()) {
588 sw.flush();
589 }
590 }
591
592 /**
593 * Return a read lock that must be held while calling the listeners for
594 * messages from the switch. Holding the read lock prevents the active
595 * switch list from being modified out from under the listeners.
596 * @return
597 */
598 @JsonIgnore
599 public Lock getListenerReadLock() {
600 return listenerLock.readLock();
601 }
602
603 /**
604 * Return a write lock that must be held when the controllers modifies the
605 * list of active switches. This is to ensure that the active switch list
606 * doesn't change out from under the listeners as they are handling a
607 * message from the switch.
608 * @return
609 */
610 @JsonIgnore
611 public Lock getListenerWriteLock() {
612 return listenerLock.writeLock();
613 }
614
615 /**
616 * Get the IP Address for the switch
617 * @return the inet address
618 */
619 @JsonSerialize(using=ToStringSerializer.class)
620 public SocketAddress getInetAddress() {
621 return channel.getRemoteAddress();
622 }
623
624 /**
625 * Send NX role request message to the switch requesting the specified role.
626 *
627 * This method should ONLY be called by @see RoleChanger.submitRequest().
628 *
629 * After sending the request add it to the queue of pending request. We
630 * use the queue to later verify that we indeed receive the correct reply.
631 * @param sw switch to send the role request message to
632 * @param role role to request
633 * @param cookie an opaque value that will be stored in the pending queue so
634 * RoleChanger can check for timeouts.
635 * @return transaction id of the role request message that was sent
636 */
637 protected int sendNxRoleRequest(Role role, long cookie)
638 throws IOException {
639 synchronized(pendingRoleRequests) {
640 // Convert the role enum to the appropriate integer constant used
641 // in the NX role request message
642 int nxRole = 0;
643 switch (role) {
644 case EQUAL:
645 nxRole = OFRoleVendorData.NX_ROLE_OTHER;
646 break;
647 case MASTER:
648 nxRole = OFRoleVendorData.NX_ROLE_MASTER;
649 break;
650 case SLAVE:
651 nxRole = OFRoleVendorData.NX_ROLE_SLAVE;
652 break;
653 default:
654 log.error("Invalid Role specified for switch {}."
655 + " Disconnecting.", this);
656 // TODO: should throw an error
657 return 0;
658 }
659
660 // Construct the role request message
661 OFVendor roleRequest = (OFVendor)floodlightProvider.
662 getOFMessageFactory().getMessage(OFType.VENDOR);
663 int xid = this.getNextTransactionId();
664 roleRequest.setXid(xid);
665 roleRequest.setVendor(OFNiciraVendorData.NX_VENDOR_ID);
666 OFRoleRequestVendorData roleRequestData = new OFRoleRequestVendorData();
667 roleRequestData.setRole(nxRole);
668 roleRequest.setVendorData(roleRequestData);
669 roleRequest.setLengthU(OFVendor.MINIMUM_LENGTH +
670 roleRequestData.getLength());
671
672 // Send it to the switch
673 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
674 msglist.add(roleRequest);
675 // FIXME: should this use this.write() in order for messages to
676 // be processed by handleOutgoingMessage()
677 this.channel.write(msglist);
678
679 pendingRoleRequests.add(new PendingRoleRequestEntry(xid, role, cookie));
680 return xid;
681 }
682 }
683
684 /**
685 * Deliver a RoleReply message to this switch. Checks if the reply
686 * message matches the expected reply (head of the pending request queue).
687 * We require in-order delivery of replies. If there's any deviation from
688 * our expectations we disconnect the switch.
689 *
690 * We must not check the received role against the controller's current
691 * role because there's no synchronization but that's fine @see RoleChanger
692 *
693 * Will be called by the OFChannelHandler's receive loop
694 *
695 * @param xid Xid of the reply message
696 * @param role The Role in the the reply message
697 */
698 @LogMessageDocs({
699 @LogMessageDoc(level="ERROR",
700 message="Switch {switch}: received unexpected role reply for " +
701 "Role {role}" +
702 " Disconnecting switch",
703 explanation="The switch sent an unexpected HA role reply",
704 recommendation=HA_CHECK_SWITCH),
705 @LogMessageDoc(level="ERROR",
706 message="Switch {switch}: expected role reply with " +
707 "Xid {xid}, got {xid}. Disconnecting switch",
708 explanation="The switch sent an unexpected HA role reply",
709 recommendation=HA_CHECK_SWITCH),
710 @LogMessageDoc(level="ERROR",
711 message="Switch {switch}: expected role reply with " +
712 "Role {role}, got {role}. Disconnecting switch",
713 explanation="The switch sent an unexpected HA role reply",
714 recommendation=HA_CHECK_SWITCH)
715 })
716 protected void deliverRoleReply(int xid, Role role) {
717 synchronized(pendingRoleRequests) {
718 PendingRoleRequestEntry head = pendingRoleRequests.poll();
719 if (head == null) {
720 // Maybe don't disconnect if the role reply we received is
721 // for the same role we are already in.
722 log.error("Switch {}: received unexpected role reply for Role {}" +
723 " Disconnecting switch", this, role );
724 this.channel.close();
725 }
726 else if (head.xid != xid) {
727 // check xid before role!!
728 log.error("Switch {}: expected role reply with " +
729 "Xid {}, got {}. Disconnecting switch",
730 new Object[] { this, head.xid, xid } );
731 this.channel.close();
732 }
733 else if (head.role != role) {
734 log.error("Switch {}: expected role reply with " +
735 "Role {}, got {}. Disconnecting switch",
736 new Object[] { this, head.role, role } );
737 this.channel.close();
738 }
739 else {
740 log.debug("Received role reply message from {}, setting role to {}",
741 this, role);
742 if (this.role == null && getAttribute(SWITCH_SUPPORTS_NX_ROLE) == null) {
743 // The first role reply we received. Set the attribute
744 // that the switch supports roles
745 setAttribute(SWITCH_SUPPORTS_NX_ROLE, true);
746 }
747 this.role = role;
748 }
749 }
750 }
751
752 /**
753 * Checks whether the given xid matches the xid of the first pending
754 * role request.
755 * @param xid
756 * @return
757 */
758 protected boolean checkFirstPendingRoleRequestXid (int xid) {
759 synchronized(pendingRoleRequests) {
760 PendingRoleRequestEntry head = pendingRoleRequests.peek();
761 if (head == null)
762 return false;
763 else
764 return head.xid == xid;
765 }
766 }
767
768 /**
769 * Checks whether the given request cookie matches the cookie of the first
770 * pending request
771 * @param cookie
772 * @return
773 */
774 protected boolean checkFirstPendingRoleRequestCookie(long cookie) {
775 synchronized(pendingRoleRequests) {
776 PendingRoleRequestEntry head = pendingRoleRequests.peek();
777 if (head == null)
778 return false;
779 else
780 return head.cookie == cookie;
781 }
782 }
783
784 /**
785 * Called if we receive a vendor error message indicating that roles
786 * are not supported by the switch. If the xid matches the first pending
787 * one, we'll mark the switch as not supporting roles and remove the head.
788 * Otherwise we ignore it.
789 * @param xid
790 */
Jonathan Harta95c6d92013-03-18 16:12:27 -0700791 protected Role deliverRoleRequestNotSupported(int xid) {
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800792 synchronized(pendingRoleRequests) {
793 PendingRoleRequestEntry head = pendingRoleRequests.poll();
794 this.role = null;
795 if (head!=null && head.xid == xid) {
796 setAttribute(SWITCH_SUPPORTS_NX_ROLE, false);
Jonathan Harta95c6d92013-03-18 16:12:27 -0700797 return head.role;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800798 }
799 else {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700800 log.debug("Closing {} because a role request error didn't match " +
801 "head of pendingRoleRequests queue", this);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800802 this.channel.close();
Jonathan Harta95c6d92013-03-18 16:12:27 -0700803 return null;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800804 }
805 }
806 }
807
808 @Override
809 public Future<OFFeaturesReply> getFeaturesReplyFromSwitch()
810 throws IOException {
811 OFMessage request = new OFFeaturesRequest();
812 request.setXid(getNextTransactionId());
813 OFFeaturesReplyFuture future =
814 new OFFeaturesReplyFuture(threadPool, this, request.getXid());
815 this.featuresFutureMap.put(request.getXid(), future);
816 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
817 msglist.add(request);
818 this.channel.write(msglist);
819 return future;
820 }
821
822 @Override
823 public void deliverOFFeaturesReply(OFMessage reply) {
824 OFFeaturesReplyFuture future = this.featuresFutureMap.get(reply.getXid());
825 if (future != null) {
826 future.deliverFuture(this, reply);
827 // The future will ultimately unregister itself and call
828 // cancelFeaturesReply
829 return;
830 }
831 log.error("Switch {}: received unexpected featureReply", this);
832 }
833
834 @Override
835 public void cancelFeaturesReply(int transactionId) {
836 this.featuresFutureMap.remove(transactionId);
837 }
838
839
840 @Override
841 public int getBuffers() {
842 return buffers;
843 }
844
845
846 @Override
847 public int getActions() {
848 return actions;
849 }
850
851
852 @Override
853 public int getCapabilities() {
854 return capabilities;
855 }
856
857
858 @Override
859 public byte getTables() {
860 return tables;
861 }
Umesh Krishnaswamyf962d642013-01-23 19:04:23 -0800862
863
864 @Override
865 public void setupRemoteSwitch(Long dpid) {
866 this.datapathId = dpid;
867 this.stringId = HexString.toHexString(this.datapathId);
868 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800869}