blob: f0648beadfd603ec784198adb4b159b9f4387df0 [file] [log] [blame]
Rich Lane2ee3d162015-05-07 13:53:05 -07001:: # Copyright 2015, Big Switch Networks, Inc.
2:: #
3:: # LoxiGen is licensed under the Eclipse Public License, version 1.0 (EPL), with
4:: # the following special exception:
5:: #
6:: # LOXI Exception
7:: #
8:: # As a special exception to the terms of the EPL, you may distribute libraries
9:: # generated by LoxiGen (LoxiGen Libraries) under the terms of your choice, provided
10:: # that copyright and licensing notices generated by LoxiGen are not altered or removed
11:: # from the LoxiGen Libraries and the notice provided below is (i) included in
12:: # the LoxiGen Libraries, if distributed in source code form and (ii) included in any
13:: # documentation for the LoxiGen Libraries, if distributed in binary form.
14:: #
15:: # Notice: "Copyright 2013, Big Switch Networks, Inc. This library was generated by the LoxiGen Compiler."
16:: #
17:: # You may not use this file except in compliance with the EPL or LOXI Exception. You may obtain
18:: # a copy of the EPL at:
19:: #
20:: # http://www.eclipse.org/legal/epl-v10.html
21:: #
22:: # Unless required by applicable law or agreed to in writing, software
23:: # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
24:: # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
25:: # EPL for the specific language governing permissions and limitations
26:: # under the EPL.
27::
28# Copyright 2015, Big Switch Networks, Inc.
29
30"""
31OpenFlow connection class
32
33This class creates a thread which continually parses OpenFlow messages off the
34supplied socket and places them in a queue. The class has methods for reading messages
35from the RX queue, sending messages, and higher level operations like request-response
36and multipart transactions.
37"""
38
39import loxi
40import loxi.of14
41import logging
42import time
43import socket
44import errno
45import os
46import select
47from threading import Condition, Lock, Thread
48
49DEFAULT_TIMEOUT = 1
50
51class TransactionError(Exception):
52 def __str__(self):
53 return self.args[0]
54
55 @property
56 def msg(self):
57 return self.args[1]
58
59class Connection(Thread):
60 def __init__(self, sock):
61 Thread.__init__(self)
62 self.sock = sock
63 self.logger = logging.getLogger("connection")
64 self.rx = []
65 self.rx_cv = Condition()
66 self.tx_lock = Lock()
67 self.next_xid = 1
68 self.wakeup_rd, self.wakeup_wr = os.pipe()
69 self.finished = False
70 self.read_buffer = None
71
72 def run(self):
73 while not self.finished:
74 rd, wr, err = select.select([self.sock, self.wakeup_rd], [], [])
75 if self.sock in rd:
76 self.process_read()
77 if self.wakeup_rd in rd:
78 os.read(self.wakeup_rd, 1)
79 self.logger.debug("Exited event loop")
80
81 def process_read(self):
82 recvd = self.sock.recv(4096)
83
84 self.logger.debug("Received %d bytes", len(recvd))
85
86 buf = self.read_buffer
87 if buf:
88 buf += recvd
89 else:
90 buf = recvd
91
92 offset = 0
93 while offset < len(buf):
94 if offset + 8 > len(buf):
95 # Not enough data for the OpenFlow header
96 break
97
98 # Parse the header to get type
Rich Lane32b7e732015-05-22 13:06:55 -070099 hdr_version, hdr_type, hdr_msglen, hdr_xid = loxi.of14.message.parse_header(buf[offset:])
Rich Lane2ee3d162015-05-07 13:53:05 -0700100
Rich Lane32b7e732015-05-22 13:06:55 -0700101 # Use loxi to resolve ofp of matching version
Rich Lane2ee3d162015-05-07 13:53:05 -0700102 ofp = loxi.protocol(hdr_version)
103
104 # Extract the raw message bytes
Rich Lane32b7e732015-05-22 13:06:55 -0700105 if (offset + hdr_msglen) > len(buf):
Rich Lane2ee3d162015-05-07 13:53:05 -0700106 # Not enough data for the body
107 break
Rich Lane32b7e732015-05-22 13:06:55 -0700108 rawmsg = buf[offset : offset + hdr_msglen]
109 offset += hdr_msglen
Rich Lane2ee3d162015-05-07 13:53:05 -0700110
111 msg = ofp.message.parse_message(rawmsg)
112 if not msg:
113 self.logger.warn("Could not parse message")
114 continue
115
116 self.logger.debug("Received message %s.%s xid %d length %d",
Rich Lane32b7e732015-05-22 13:06:55 -0700117 type(msg).__module__, type(msg).__name__, hdr_xid, hdr_msglen)
Rich Lane2ee3d162015-05-07 13:53:05 -0700118
119 with self.rx_cv:
120 self.rx.append(msg)
121 self.rx_cv.notify_all()
122
123 if offset == len(buf):
124 self.read_buffer = None
125 else:
126 self.read_buffer = buf[offset:]
127 self.logger.debug("%d bytes remaining", len(self.read_buffer))
128
129 def recv(self, predicate, timeout=DEFAULT_TIMEOUT):
130 """
131 Remove and return the first message in the RX queue for
132 which 'predicate' returns true
133 """
134 assert self.is_alive()
135
136 deadline = time.time() + timeout
137 while True:
138 with self.rx_cv:
139 for i, msg in enumerate(self.rx):
140 if predicate(msg):
141 return self.rx.pop(i)
142
143 now = time.time()
144 if now > deadline:
145 return None
146 else:
147 self.rx_cv.wait(deadline - now)
148
149 def recv_any(self, timeout=DEFAULT_TIMEOUT):
150 """
151 Return the first message in the RX queue
152 """
153 return self.recv(lambda msg: True, timeout)
154
155 def recv_xid(self, xid, timeout=DEFAULT_TIMEOUT):
156 """
157 Return the first message in the RX queue with XID 'xid'
158 """
159 return self.recv(lambda msg: msg.xid == xid, timeout)
160
161 def recv_class(self, klass, timeout=DEFAULT_TIMEOUT):
162 """
163 Return the first message in the RX queue which is an instance of 'klass'
164 """
165 return self.recv(lambda msg: isinstance(msg, klass), timeout)
166
167 def send_raw(self, buf):
168 """
169 Send raw bytes on the socket
170 """
171 assert self.is_alive()
172 self.logger.debug("Sending raw message length %d", len(buf))
173 with self.tx_lock:
174 if self.sock.sendall(buf) is not None:
175 raise RuntimeError("failed to send message to switch")
176
177 def send(self, msg):
178 """
179 Send a message
180 """
181 assert self.is_alive()
182
183 if msg.xid is None:
184 msg.xid = self._gen_xid()
185 buf = msg.pack()
186 self.logger.debug("Sending message %s.%s xid %d length %d",
187 type(msg).__module__, type(msg).__name__, msg.xid, len(buf))
188 with self.tx_lock:
189 if self.sock.sendall(buf) is not None:
190 raise RuntimeError("failed to send message to switch")
191
192 def transact(self, msg, timeout=DEFAULT_TIMEOUT):
193 """
194 Send a message and return the reply
195 """
196 self.send(msg)
197 reply = self.recv_xid(msg.xid, timeout)
198 if reply is None:
199 raise TransactionError("no reply for %s" % type(msg).__name__, None)
200 elif isinstance(reply, loxi.protocol(reply.version).message.error_msg):
201 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
202 return reply
203
204 def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
205 """
206 Send a multipart request and yield each entry from the replies
207 """
208 self.send(msg)
209 finished = False
210 while not finished:
211 reply = self.recv_xid(msg.xid, timeout)
212 if reply is None:
213 raise TransactionError("no reply for %s" % type(msg).__name__, None)
214 elif not isinstance(reply, loxi.protocol(reply.version).message.stats_reply):
215 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
216 for entry in reply.entries:
217 yield entry
218 finished = reply.flags & loxi.protocol(reply.version).OFPSF_REPLY_MORE == 0
219
220 def transact_multipart(self, msg, timeout=DEFAULT_TIMEOUT):
221 """
222 Send a multipart request and return all entries from the replies
223 """
224 entries = []
225 for entry in self.transact_multipart_generator(msg, timeout):
226 entries.append(entry)
227 return entries
228
229 def stop(self):
230 """
231 Signal the thread to exit and wait for it
232 """
233 assert not self.finished
234 self.logger.debug("Stopping connection")
235 self.finished = True
236 os.write(self.wakeup_wr, "x")
237 self.join()
238 self.sock.close()
239 os.close(self.wakeup_rd)
240 os.close(self.wakeup_wr)
241 self.logger.debug("Stopped connection")
242
243 def _gen_xid(self):
244 xid = self.next_xid
245 self.next_xid += 1
246 return xid
247
248def connect(ip, port=6653, daemon=True, ofp=loxi.of14):
249 """
250 Actively connect to a switch
251 """
252 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
253 soc.connect((ip, port))
254 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
255 cxn = Connection(soc)
256 cxn.daemon = daemon
257 cxn.logger.debug("Connected to %s:%d", ip, port)
258 cxn.start()
Rich Lanea006fbc2015-05-22 13:07:11 -0700259
Rich Lane2ee3d162015-05-07 13:53:05 -0700260 cxn.send(ofp.message.hello())
Rich Lanea006fbc2015-05-22 13:07:11 -0700261 if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
262 raise Exception("Did not receive HELLO")
263
Rich Lane2ee3d162015-05-07 13:53:05 -0700264 return cxn