blob: b5e997f4a88ca7be429f485e5daa0169595f5ba6 [file] [log] [blame]
Jordan Haltermanddc2b182017-07-23 01:27:47 -07001#!/usr/bin/env python
2"""
3usage: onos-dist-verify [-h] [-n PRIMITIVE_NAME] [-p PARALLELISM]
4 [-c OPERATION_COUNT]
5 [-od OPERATION_DELAY OPERATION_DELAY]
6 [-d [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]]]
7 [-dd DISRUPTOR_DELAY DISRUPTOR_DELAY]
8
9Perform random operations on a distributed primitive using multiple concurrent
10processes and verify the linearizability of the history.
11
12optional arguments:
13 -h, --help show this help message and exit
14 -n PRIMITIVE_NAME, --primitive-name PRIMITIVE_NAME
15 Name of the AtomicValue primitive to test. Defaults to
16 'test'
17 -p PARALLELISM, --parallelism PARALLELISM
18 Number of parallel processes with which to test.
19 Defaults to 8
20 -c OPERATION_COUNT, --operation-count OPERATION_COUNT
21 Number of operations to execute per process. Defaults
22 to 50 operations per process
23 -od OPERATION_DELAY OPERATION_DELAY, --operation-delay OPERATION_DELAY OPERATION_DELAY
24 Uniform random delay to wait between operations.
25 Defaults to 1-5 seconds per operation
26 -d [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]], --disruptors [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]]
27 List of disruptor functions to run. Disruptors will be
28 run periodically according to the configured
29 --disruptor-delay. Defaults to 'partition_random'
30 -dd DISRUPTOR_DELAY DISRUPTOR_DELAY, --disruptor-delay DISRUPTOR_DELAY DISRUPTOR_DELAY
31 Uniform random delay to wait between disruptor
32 functions.Defaults to 15-30 seconds between disruptors
33"""
34import argparse
35import random
36import re
37import sys
38import threading
39import time
40import os
41import subprocess
42from collections import OrderedDict
43from tempfile import NamedTemporaryFile
44from abc import ABCMeta, abstractmethod
45
46# The edn_format module is required to run tests
47try:
48 import edn_format
49except ImportError:
50 print "No edn_format module found; use 'pip install edn_format'"
51 sys.exit(1)
52
53DEVNULL = open(os.devnull, 'w')
54
55
56def run_test(name, processes, operation_count, operation_delay, disruptors, disruptor_delay):
57 """Runs the linearizability test."""
58
59 def get_nodes():
60 """Parses environment variables to get a sorted list of test nodes."""
61 alphanum_key = lambda key: [int(c) if c.isdigit() else c.lower() for c in re.split('([0-9]+)', key)]
62 vars = [var for var in os.environ if re.match(r"OC[0-9]+", var)]
63 return [os.environ[v] for v in sorted(vars, key=alphanum_key)]
64
65 # Parse the environment variables to get the list of test nodes.
66 nodes = get_nodes()
67
68 # Initialize the test by resetting the distributed value.
69 _init_test(name, nodes)
70
71 # Create a history object with which to track history
72 history = History()
73 disruptor = Disruptor(nodes, disruptors, disruptor_delay, history)
74 processes = [Process(i+1, name, operation_count, operation_delay, random.choice(nodes), history) for i in range(processes)]
75
76 # Start the test.
77 _start_test(disruptor, processes)
78
79 # Run the disruptor and processes until complete.
80 _block_until_complete(disruptor, processes)
81
82 # Check the recorded history for linearizability.
83 _verify_test(history)
84
85
86def _init_test(name, nodes):
87 """Initializes the test by resetting the test value value."""
88 node = nodes[0]
89 try:
90 subprocess.check_call(['onos', node, 'value-test', name, 'set', 'null'])
91 except subprocess.CalledProcessError, e:
92 print "Failed to reset test data"
93 sys.exit(1)
94
95
96def _start_test(disruptor, processes):
97 """Starts the test threads."""
98 for process in processes:
99 process.start()
100 disruptor.start()
101
102
103def _block_until_complete(disruptor, processes):
104 """Runs the given disruptor and processes until complete."""
105 while True:
106 # If any process is still running, sleep and then continue to the next iteration of the loop.
107 if len([process for process in processes if process.is_running()]) == 0:
108 # Once all processes have completed, stop the disruptor.
109 disruptor.stop()
110
Jordan Halterman6ce52352017-08-09 11:04:38 -0700111 # Wait for the disruptor thread to complete to ensure partitions are healed and crashed nodes are recovered.
112 if not disruptor.is_running():
113 break
Jordan Haltermanddc2b182017-07-23 01:27:47 -0700114
115 # If we haven't broken out of the loop by now, sleep and then check again.
116 time.sleep(1)
117
118
119def _verify_test(history):
120 """Checks the given history for linearizability."""
121 try:
122 knossos_path = os.environ['KNOSSOS_PATH']
123 except KeyError:
124 print "KNOSSOS_PATH is not defined; skipping model checker"
125 else:
126 # Create and write a temporary file to be passed to the Knossos model checker.
127 with NamedTemporaryFile(mode='w+', delete=False) as f:
128 f.write(str(history))
129 file_name = f.name
130
131 # Run the Knossos model checker and parse the output to determine whether the history is linearizable.
132 try:
133 output = subprocess.check_output(['lein', 'run', '--model', 'cas-register', file_name], cwd=knossos_path)
134 result = output.strip().split()[-1]
135 if result == 'true':
136 print "\rHistory is linearizable! :-)"
137 exitcode = 0
138 else:
139 print "\rHistory is not linearizable. :-("
140 exitcode = 1
141 except subprocess.CalledProcessError, e:
142 exitcode = e.returncode
143
144 # Remove the temporary file before exiting.
145 try:
146 os.remove(file_name)
147 except:
148 pass
149 sys.exit(exitcode)
150
151
152def call(*args, **kwargs):
153 """Executes a command, awaiting the completion of the command with an optional timeout.
154
155 If a timeout is specified, a timer thread waits for the commit to complete or kills the command once the timeout
156 expires. Output from the subprocess is directed to os.devnull.
157 """
158 timeout = kwargs.pop('timeout', None)
159 process = subprocess.Popen(stdout=DEVNULL, stderr=DEVNULL, *args, **kwargs)
160 if timeout is not None:
161 timer = threading.Timer(timeout, lambda p: p.kill(), [process])
162 try:
163 timer.start()
164 return process.wait()
165 finally:
166 timer.cancel()
167
168
169def get_output(*args, **kwargs):
170 """Executes a command, awaiting the output of the command with an optional timeout.
171
172 If a timeout is specified, a timer thread waits for the commit to complete or kills the command once the timeout
173 expires. stderr output from the subprocess is directed to os.devnull. stdout is returned.
174 """
175 timeout = kwargs.pop('timeout', None)
176 process = subprocess.Popen(stdout=subprocess.PIPE, stderr=DEVNULL, *args, **kwargs)
177
178 def join_process():
179 stdout, stderr = process.communicate()
180 retcode = process.poll()
181 if retcode:
182 cmd = kwargs.get("args")
183 if cmd is None:
184 cmd = args[0]
185 raise subprocess.CalledProcessError(retcode, cmd, output=stdout)
186 return stdout
187
188 if timeout is not None:
189 timer = threading.Timer(timeout, lambda p: p.kill(), [process])
190 try:
191 timer.start()
192 return join_process()
193 finally:
194 timer.cancel()
195 else:
196 return join_process()
197
198
199class History(object):
200 """Records and logs the history of operations.
201
202 This object directly mimics the format expected by the Knossos linearizability checker. Events are logged in
203 edn format, and str(history) will return the full history in edn format.
204 """
205 def __init__(self):
206 self.entries = []
207
208 def record(self, entry):
209 """Records an entry in the history."""
210 self.entries.append(entry)
211 print str(entry).strip() + '\r'
212
213 def __str__(self):
214 return edn_format.dumps([entry.format() for entry in self.entries])
215
216
217class HistoryEntry(object):
218 """History entry."""
219 __metaclass__ = ABCMeta
220
221 @abstractmethod
222 def format(self):
223 """Returns the entry in EDN format."""
224
225 def __str__(self):
226 return edn_format.dumps(self.format())
227
228
229class ProcessEntry(HistoryEntry):
230 """Process entry."""
231 def __init__(self, process, action, operation, *values):
232 self.process = process
233 self.action = action
234 self.operation = operation
235 self.values = values
236
237 def format(self):
238 return OrderedDict([
239 (edn_format.Keyword('process'), self.process),
240 (edn_format.Keyword('type'), edn_format.Keyword(self.action)),
241 (edn_format.Keyword('f'), edn_format.Keyword(self.operation)),
242 (edn_format.Keyword('value'), self.values[0] if len(self.values) == 1 else list(self.values))
243 ])
244
245
246class DisruptorEntry(HistoryEntry):
247 """Disruptor history entry."""
248 def __init__(self, event, message):
249 self.event = event
250 self.message = message
251
252 def format(self):
253 return OrderedDict([
254 (edn_format.Keyword('process'), edn_format.Keyword('disruptor')),
255 (edn_format.Keyword('type'), edn_format.Keyword('info')),
256 (edn_format.Keyword('f'), edn_format.Keyword(self.event)),
257 (edn_format.Keyword('value'), self.message)
258 ])
259
260
261class Runnable(object):
262 """Base class for managing the lifecycle of a threaded test process."""
263 __metaclass__ = ABCMeta
264
265 def __init__(self):
266 self.thread = None
267 self.running = False
268
269 def start(self):
270 """Starts the runnable thread."""
271 self.thread = threading.Thread(target=self.run)
272 self.thread.daemon = True
273 self.running = True
274 self.thread.start()
275
276 @abstractmethod
277 def run(self):
278 """Runs the thread. This method should be overridden by implementors."""
279
280 def is_running(self):
281 """Returns a boolean indicating whether the disruptor is running."""
282 return self.running or self.thread.is_alive()
283
284 def stop(self):
285 """Stops the disruptor thread.
286
287 Calling this method will not immediately stop the thread. Instead, a flag will be set, and the run() method
288 is expected to exit according to the 'running' flag. Use 'is_running()' to determine whether the thread is
289 stopped and has exited.
290 """
291 self.running = False
292
293
294class Process(Runnable):
295 """Test runner for a single process.
296
297 A process simulates operations from a single actor in the cluster. When the process is started, it will begin
298 performing random read, write, or cas operations, sleeping for random intervals between operations. Each operation
299 performed by the process will be logged in the History object provided to the constructor. The process runs for a
300 predefined number of operations or until an operation fails with an unknown error (e.g. a timeout).
301 """
302 def __init__(self, id, name, operation_count, delay, node, history):
303 super(Process, self).__init__()
304 self.id = id
305 self.name = name
306 self.operation_count = operation_count
307 self.delay = delay
308 self.node = node
309 self.history = history
310 self.operations = (self.read, self.write, self.cas, self.read_and_cas)
311 self.value = None
312
313 def run(self):
314 """Runs the process."""
315 for _ in range(self.operation_count):
316 self._wait()
317 self._run()
318 if not self.running:
319 break
320 if self.running:
321 self.stop()
322
323 def _run(self):
324 """Runs a random operation."""
325 return random.choice(self.operations)()
326
327 def _wait(self):
328 """Blocks for a uniform random delay according to the process configuration."""
329 time.sleep(random.uniform(self.delay[0], self.delay[1]))
330
331 def _next_value(self):
332 """Returns the next random value to set."""
333 return random.randint(1, 10)
334
335 def _log(self, action, operation, *values):
336 """Logs an operation."""
337 self.history.record(ProcessEntry(self.id, action, operation, *values))
338
339 def _invoke(self, operation, *values):
340 """Logs an operation invocation event in the process history."""
341 self._log('invoke', operation, *values)
342
343 def _ok(self, operation, *values):
344 """Logs an operation success event in the process history."""
345 self._log('ok', operation, *values)
346 return True
347
348 def _fail(self, operation, *values):
349 """Logs an operation failure event in the process history."""
350 self._log('fail', operation, *values)
351 return True
352
353 def _info(self, operation, *values):
354 """Logs an operation info event in the process history and stops the process."""
355 self._log('info', operation, *values)
356 self.stop()
357 return False
358
359 def read(self):
360 """Executes a read operation."""
361 self._invoke('read', None)
362 try:
363 output = get_output([
364 'onos',
365 self.node,
366 'value-test',
367 self.name,
368 'get'
369 ], timeout=5).strip()
370 self.value = None if output == 'null' else int(output)
371 return self._ok('read', self.value)
372 except subprocess.CalledProcessError:
373 return self._info('read', None)
374
375 def write(self):
376 """Executes a write operation."""
377 next_value = self._next_value()
378 self._invoke('write', next_value)
379 try:
380 get_output([
381 'onos',
382 self.node,
383 'value-test',
384 self.name,
385 'set',
386 str(next_value)
387 ], timeout=5)
388 self.value = next_value
389 return self._ok('write', self.value)
390 except subprocess.CalledProcessError:
391 return self._info('write', next_value)
392
393 def cas(self):
394 """Executes a check-and-set operation."""
395 current_value, next_value = self.value, self._next_value()
396 self._invoke('cas', current_value, next_value)
397 try:
398 output = get_output([
399 'onos',
400 self.node,
401 'value-test',
402 self.name,
403 'compareAndSet',
404 str(current_value) if current_value is not None else 'null',
405 str(next_value)
406 ], timeout=5).strip()
407
408 if output == 'true':
409 self.value = next_value
410 return self._ok('cas', current_value, next_value)
411 elif output == 'false':
412 return self._fail('cas', current_value, next_value)
413 else:
414 return self._info('cas', current_value, next_value)
415 except subprocess.CalledProcessError:
416 return self._info('cas', current_value, next_value)
417
418 def read_and_cas(self):
419 """Executes a read to get the current value and then a check-and-set operation."""
420 if self.read():
421 self.cas()
422
423
424class Disruptor(Runnable):
425 """Cluster disruptor runner.
426
427 The disruptor periodically disrupts the cluster using a random disruptor function to e.g. partition the network,
428 crash a node, or slow communication within the network. The disruptor guarantees that only one disruptor function
429 will run at any given time and the previous disruptor will be healed prior to the next disruptor beginning.
430 The disruptor sleeps for a uniform random interval between disruptor functions.
431 """
432 def __init__(self, nodes, disruptors, delay, history):
433 super(Disruptor, self).__init__()
434 self.nodes = nodes
435 self.delay = delay
436 self.history = history
437 self.disruptors = []
438 for name in disruptors:
439 try:
440 self.disruptors.append(getattr(self, name))
441 except AttributeError:
442 print "Unknown disruptor %s" % (name,)
443 sys.exit(1)
444
445 def run(self):
446 """Runs the disruptor until stopped."""
447 while self.running:
448 self._wait()
449 if self.running:
450 self._run()
451
452 def _run(self):
453 """Runs a random disruptor."""
454 random.choice(self.disruptors)()
455
456 def _wait(self):
457 """Waits for a uniform random delay."""
458 time.sleep(random.uniform(self.delay[0], self.delay[1]))
459
460 def _random_node(self):
461 """Returns a random node on which to perform an operation."""
462 return random.choice(self.nodes)
463
464 def _log(self, event, message):
465 """Logs an event in the disruptor history."""
466 self.history.record(DisruptorEntry(event, message))
467
468 def _start(self, message):
469 """Logs a start event in the disruptor history."""
470 self._log('start', message)
471
472 def _stop(self, message):
473 """Logs a stop event in the disruptor history."""
474 self._log('stop', message)
475
476 def _disrupt(self, *args):
477 """Executes a disruptor via the onos-disrupt utility."""
478 call(['onos-disrupt'] + list(args), timeout=5)
479
480 def _partition(self, node1, node2):
481 """Partitions node1 from node2."""
482 self._disrupt(node1, 'partition', node2)
483
484 def _partition_halves(self):
485 """Partitions the cluster into two halves."""
486 self._disrupt('partition-halves')
487
488 def _partition_bridge(self, nodes):
489 """Partitions the cluster with the given node as a bridge between two halves."""
490 self._disrupt(nodes, 'partition-bridge')
491
492 def _heal(self, node1=None, node2=None):
493 """Heals a partition between two nodes or between all nodes if the given nodes are None."""
494 if node1 is not None and node2 is not None:
495 self._disrupt(node1, 'heal', node2)
496 else:
497 self._disrupt('heal')
498
499 def _crash(self, node):
500 """Crashes the given node."""
501 self._disrupt(node, 'crash')
502
503 def _recover(self, node):
504 """Recovers the given node from a crash."""
505 self._disrupt(node, 'recover')
506
507 def _delay(self, node=None):
508 """Delays communication from all nodes or from the given node if specified."""
509 if node is not None:
510 self._disrupt(node, 'delay')
511 else:
512 self._disrupt('delay')
513
514 def _restore(self, node=None):
515 """Restores communication on all nodes or on the given node if specified."""
516 if node is not None:
517 self._disrupt(node, 'restore')
518 else:
519 self._disrupt('restore')
520
521 def partition_random(self):
522 """Partitions two random nodes from each other."""
523 node1 = self._random_node()
524 node2 = node1
525 while node2 == node1:
526 node2 = self._random_node()
527 self._start("Cut off %s->%s" % (node1, node2))
528 self._partition(node1, node2)
529 self._wait()
530 self._heal(node1, node2)
531 self._stop("Fully connected")
532
533 def partition_halves(self):
534 """Partitions the cluster into two halves."""
535 self._start("Partitioning network into two halves")
536 self._partition_halves()
537 self._wait()
538 self._heal()
539 self._stop("Fully connected")
540
541 def partition_bridge(self):
542 """Partitions the cluster into two halves with a bridge between them."""
543 node = self._random_node()
544 self._start("Partitioning network with bridge %s" % (node,))
545 self._partition_bridge(node)
546 self._wait()
547 self._heal()
548 self._stop("Fully connected")
549
550 def crash_random(self):
551 """Crashes a random node."""
552 node = random.choice(self.nodes)
553 self._start("Crashing %s" % (node,))
554 self._crash(node)
555 self._wait()
556 self._recover(node)
557 self._stop("Recovered %s" % (node,))
558
559 def delay(self):
560 """Delays messages on all nodes."""
561 self._start("Delay communication on all nodes")
562 self._delay()
563 self._wait()
564 self._restore()
565 self._stop("Communication restored")
566
567 def delay_random(self):
568 """Delays communication on a random node."""
569 node = self._random_node()
570 self._start("Delay communication on %s" % (node,))
571 self._delay(node)
572 self._wait()
573 self._restore(node)
574 self._stop("Communication restored on %s" % (node,))
575
576
577if __name__ == '__main__':
578 parser = argparse.ArgumentParser(
579 description="""
580 Perform random operations on a distributed primitive using multiple concurrent
581 processes and verify the linearizability of the history.
582 """
583 )
584 parser.add_argument(
585 '-n',
586 '--primitive-name',
587 type=str,
588 default='test',
589 help="Name of the AtomicValue primitive to test. Defaults to 'test'"
590 )
591 parser.add_argument(
592 '-p',
593 '--parallelism',
594 type=int,
595 default=8,
596 help="Number of parallel processes with which to test. Defaults to 8"
597 )
598 parser.add_argument(
599 '-c',
600 '--operation-count',
601 type=int,
602 default=50,
603 help="Number of operations to execute per process. Defaults to 50 operations per process"
604 )
605 parser.add_argument(
606 '-od',
607 '--operation-delay',
608 type=int,
609 nargs=2,
610 default=[1, 5],
611 help="Uniform random delay to wait between operations. Defaults to 1-5 seconds per operation"
612 )
613 parser.add_argument(
614 '-d',
615 '--disruptors',
616 type=str,
617 nargs='*',
618 default=['partition_random'],
619 choices=['partition_random', 'partition_halves', 'partition_bridge', 'crash_random', 'delay', 'delay_random'],
620 help="""
621 List of disruptor functions to run. Disruptors will be run periodically
622 according to the configured --disruptor-delay. Defaults to 'partition_random'
623 """
624 )
625 parser.add_argument(
626 '-dd',
627 '--disruptor-delay',
628 type=int,
629 nargs=2,
630 default=[15, 30],
631 help="Uniform random delay to wait between disruptor functions.Defaults to 15-30 seconds between disruptors"
632 )
633
634 args = parser.parse_args()
635
636 try:
637 run_test(
638 args.primitive_name,
639 args.parallelism,
640 args.operation_count,
641 args.operation_delay,
642 args.disruptors,
643 args.disruptor_delay
644 )
645 except KeyboardInterrupt:
646 sys.exit(1)