blob: b5e997f4a88ca7be429f485e5daa0169595f5ba6 [file] [log] [blame]
#!/usr/bin/env python
"""
usage: onos-dist-verify [-h] [-n PRIMITIVE_NAME] [-p PARALLELISM]
[-c OPERATION_COUNT]
[-od OPERATION_DELAY OPERATION_DELAY]
[-d [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]]]
[-dd DISRUPTOR_DELAY DISRUPTOR_DELAY]
Perform random operations on a distributed primitive using multiple concurrent
processes and verify the linearizability of the history.
optional arguments:
-h, --help show this help message and exit
-n PRIMITIVE_NAME, --primitive-name PRIMITIVE_NAME
Name of the AtomicValue primitive to test. Defaults to
'test'
-p PARALLELISM, --parallelism PARALLELISM
Number of parallel processes with which to test.
Defaults to 8
-c OPERATION_COUNT, --operation-count OPERATION_COUNT
Number of operations to execute per process. Defaults
to 50 operations per process
-od OPERATION_DELAY OPERATION_DELAY, --operation-delay OPERATION_DELAY OPERATION_DELAY
Uniform random delay to wait between operations.
Defaults to 1-5 seconds per operation
-d [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]], --disruptors [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]]
List of disruptor functions to run. Disruptors will be
run periodically according to the configured
--disruptor-delay. Defaults to 'partition_random'
-dd DISRUPTOR_DELAY DISRUPTOR_DELAY, --disruptor-delay DISRUPTOR_DELAY DISRUPTOR_DELAY
Uniform random delay to wait between disruptor
functions.Defaults to 15-30 seconds between disruptors
"""
import argparse
import random
import re
import sys
import threading
import time
import os
import subprocess
from collections import OrderedDict
from tempfile import NamedTemporaryFile
from abc import ABCMeta, abstractmethod
# The edn_format module is required to run tests
try:
import edn_format
except ImportError:
print "No edn_format module found; use 'pip install edn_format'"
sys.exit(1)
DEVNULL = open(os.devnull, 'w')
def run_test(name, processes, operation_count, operation_delay, disruptors, disruptor_delay):
"""Runs the linearizability test."""
def get_nodes():
"""Parses environment variables to get a sorted list of test nodes."""
alphanum_key = lambda key: [int(c) if c.isdigit() else c.lower() for c in re.split('([0-9]+)', key)]
vars = [var for var in os.environ if re.match(r"OC[0-9]+", var)]
return [os.environ[v] for v in sorted(vars, key=alphanum_key)]
# Parse the environment variables to get the list of test nodes.
nodes = get_nodes()
# Initialize the test by resetting the distributed value.
_init_test(name, nodes)
# Create a history object with which to track history
history = History()
disruptor = Disruptor(nodes, disruptors, disruptor_delay, history)
processes = [Process(i+1, name, operation_count, operation_delay, random.choice(nodes), history) for i in range(processes)]
# Start the test.
_start_test(disruptor, processes)
# Run the disruptor and processes until complete.
_block_until_complete(disruptor, processes)
# Check the recorded history for linearizability.
_verify_test(history)
def _init_test(name, nodes):
"""Initializes the test by resetting the test value value."""
node = nodes[0]
try:
subprocess.check_call(['onos', node, 'value-test', name, 'set', 'null'])
except subprocess.CalledProcessError, e:
print "Failed to reset test data"
sys.exit(1)
def _start_test(disruptor, processes):
"""Starts the test threads."""
for process in processes:
process.start()
disruptor.start()
def _block_until_complete(disruptor, processes):
"""Runs the given disruptor and processes until complete."""
while True:
# If any process is still running, sleep and then continue to the next iteration of the loop.
if len([process for process in processes if process.is_running()]) == 0:
# Once all processes have completed, stop the disruptor.
disruptor.stop()
# Wait for the disruptor thread to complete to ensure partitions are healed and crashed nodes are recovered.
if not disruptor.is_running():
break
# If we haven't broken out of the loop by now, sleep and then check again.
time.sleep(1)
def _verify_test(history):
"""Checks the given history for linearizability."""
try:
knossos_path = os.environ['KNOSSOS_PATH']
except KeyError:
print "KNOSSOS_PATH is not defined; skipping model checker"
else:
# Create and write a temporary file to be passed to the Knossos model checker.
with NamedTemporaryFile(mode='w+', delete=False) as f:
f.write(str(history))
file_name = f.name
# Run the Knossos model checker and parse the output to determine whether the history is linearizable.
try:
output = subprocess.check_output(['lein', 'run', '--model', 'cas-register', file_name], cwd=knossos_path)
result = output.strip().split()[-1]
if result == 'true':
print "\rHistory is linearizable! :-)"
exitcode = 0
else:
print "\rHistory is not linearizable. :-("
exitcode = 1
except subprocess.CalledProcessError, e:
exitcode = e.returncode
# Remove the temporary file before exiting.
try:
os.remove(file_name)
except:
pass
sys.exit(exitcode)
def call(*args, **kwargs):
"""Executes a command, awaiting the completion of the command with an optional timeout.
If a timeout is specified, a timer thread waits for the commit to complete or kills the command once the timeout
expires. Output from the subprocess is directed to os.devnull.
"""
timeout = kwargs.pop('timeout', None)
process = subprocess.Popen(stdout=DEVNULL, stderr=DEVNULL, *args, **kwargs)
if timeout is not None:
timer = threading.Timer(timeout, lambda p: p.kill(), [process])
try:
timer.start()
return process.wait()
finally:
timer.cancel()
def get_output(*args, **kwargs):
"""Executes a command, awaiting the output of the command with an optional timeout.
If a timeout is specified, a timer thread waits for the commit to complete or kills the command once the timeout
expires. stderr output from the subprocess is directed to os.devnull. stdout is returned.
"""
timeout = kwargs.pop('timeout', None)
process = subprocess.Popen(stdout=subprocess.PIPE, stderr=DEVNULL, *args, **kwargs)
def join_process():
stdout, stderr = process.communicate()
retcode = process.poll()
if retcode:
cmd = kwargs.get("args")
if cmd is None:
cmd = args[0]
raise subprocess.CalledProcessError(retcode, cmd, output=stdout)
return stdout
if timeout is not None:
timer = threading.Timer(timeout, lambda p: p.kill(), [process])
try:
timer.start()
return join_process()
finally:
timer.cancel()
else:
return join_process()
class History(object):
"""Records and logs the history of operations.
This object directly mimics the format expected by the Knossos linearizability checker. Events are logged in
edn format, and str(history) will return the full history in edn format.
"""
def __init__(self):
self.entries = []
def record(self, entry):
"""Records an entry in the history."""
self.entries.append(entry)
print str(entry).strip() + '\r'
def __str__(self):
return edn_format.dumps([entry.format() for entry in self.entries])
class HistoryEntry(object):
"""History entry."""
__metaclass__ = ABCMeta
@abstractmethod
def format(self):
"""Returns the entry in EDN format."""
def __str__(self):
return edn_format.dumps(self.format())
class ProcessEntry(HistoryEntry):
"""Process entry."""
def __init__(self, process, action, operation, *values):
self.process = process
self.action = action
self.operation = operation
self.values = values
def format(self):
return OrderedDict([
(edn_format.Keyword('process'), self.process),
(edn_format.Keyword('type'), edn_format.Keyword(self.action)),
(edn_format.Keyword('f'), edn_format.Keyword(self.operation)),
(edn_format.Keyword('value'), self.values[0] if len(self.values) == 1 else list(self.values))
])
class DisruptorEntry(HistoryEntry):
"""Disruptor history entry."""
def __init__(self, event, message):
self.event = event
self.message = message
def format(self):
return OrderedDict([
(edn_format.Keyword('process'), edn_format.Keyword('disruptor')),
(edn_format.Keyword('type'), edn_format.Keyword('info')),
(edn_format.Keyword('f'), edn_format.Keyword(self.event)),
(edn_format.Keyword('value'), self.message)
])
class Runnable(object):
"""Base class for managing the lifecycle of a threaded test process."""
__metaclass__ = ABCMeta
def __init__(self):
self.thread = None
self.running = False
def start(self):
"""Starts the runnable thread."""
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.running = True
self.thread.start()
@abstractmethod
def run(self):
"""Runs the thread. This method should be overridden by implementors."""
def is_running(self):
"""Returns a boolean indicating whether the disruptor is running."""
return self.running or self.thread.is_alive()
def stop(self):
"""Stops the disruptor thread.
Calling this method will not immediately stop the thread. Instead, a flag will be set, and the run() method
is expected to exit according to the 'running' flag. Use 'is_running()' to determine whether the thread is
stopped and has exited.
"""
self.running = False
class Process(Runnable):
"""Test runner for a single process.
A process simulates operations from a single actor in the cluster. When the process is started, it will begin
performing random read, write, or cas operations, sleeping for random intervals between operations. Each operation
performed by the process will be logged in the History object provided to the constructor. The process runs for a
predefined number of operations or until an operation fails with an unknown error (e.g. a timeout).
"""
def __init__(self, id, name, operation_count, delay, node, history):
super(Process, self).__init__()
self.id = id
self.name = name
self.operation_count = operation_count
self.delay = delay
self.node = node
self.history = history
self.operations = (self.read, self.write, self.cas, self.read_and_cas)
self.value = None
def run(self):
"""Runs the process."""
for _ in range(self.operation_count):
self._wait()
self._run()
if not self.running:
break
if self.running:
self.stop()
def _run(self):
"""Runs a random operation."""
return random.choice(self.operations)()
def _wait(self):
"""Blocks for a uniform random delay according to the process configuration."""
time.sleep(random.uniform(self.delay[0], self.delay[1]))
def _next_value(self):
"""Returns the next random value to set."""
return random.randint(1, 10)
def _log(self, action, operation, *values):
"""Logs an operation."""
self.history.record(ProcessEntry(self.id, action, operation, *values))
def _invoke(self, operation, *values):
"""Logs an operation invocation event in the process history."""
self._log('invoke', operation, *values)
def _ok(self, operation, *values):
"""Logs an operation success event in the process history."""
self._log('ok', operation, *values)
return True
def _fail(self, operation, *values):
"""Logs an operation failure event in the process history."""
self._log('fail', operation, *values)
return True
def _info(self, operation, *values):
"""Logs an operation info event in the process history and stops the process."""
self._log('info', operation, *values)
self.stop()
return False
def read(self):
"""Executes a read operation."""
self._invoke('read', None)
try:
output = get_output([
'onos',
self.node,
'value-test',
self.name,
'get'
], timeout=5).strip()
self.value = None if output == 'null' else int(output)
return self._ok('read', self.value)
except subprocess.CalledProcessError:
return self._info('read', None)
def write(self):
"""Executes a write operation."""
next_value = self._next_value()
self._invoke('write', next_value)
try:
get_output([
'onos',
self.node,
'value-test',
self.name,
'set',
str(next_value)
], timeout=5)
self.value = next_value
return self._ok('write', self.value)
except subprocess.CalledProcessError:
return self._info('write', next_value)
def cas(self):
"""Executes a check-and-set operation."""
current_value, next_value = self.value, self._next_value()
self._invoke('cas', current_value, next_value)
try:
output = get_output([
'onos',
self.node,
'value-test',
self.name,
'compareAndSet',
str(current_value) if current_value is not None else 'null',
str(next_value)
], timeout=5).strip()
if output == 'true':
self.value = next_value
return self._ok('cas', current_value, next_value)
elif output == 'false':
return self._fail('cas', current_value, next_value)
else:
return self._info('cas', current_value, next_value)
except subprocess.CalledProcessError:
return self._info('cas', current_value, next_value)
def read_and_cas(self):
"""Executes a read to get the current value and then a check-and-set operation."""
if self.read():
self.cas()
class Disruptor(Runnable):
"""Cluster disruptor runner.
The disruptor periodically disrupts the cluster using a random disruptor function to e.g. partition the network,
crash a node, or slow communication within the network. The disruptor guarantees that only one disruptor function
will run at any given time and the previous disruptor will be healed prior to the next disruptor beginning.
The disruptor sleeps for a uniform random interval between disruptor functions.
"""
def __init__(self, nodes, disruptors, delay, history):
super(Disruptor, self).__init__()
self.nodes = nodes
self.delay = delay
self.history = history
self.disruptors = []
for name in disruptors:
try:
self.disruptors.append(getattr(self, name))
except AttributeError:
print "Unknown disruptor %s" % (name,)
sys.exit(1)
def run(self):
"""Runs the disruptor until stopped."""
while self.running:
self._wait()
if self.running:
self._run()
def _run(self):
"""Runs a random disruptor."""
random.choice(self.disruptors)()
def _wait(self):
"""Waits for a uniform random delay."""
time.sleep(random.uniform(self.delay[0], self.delay[1]))
def _random_node(self):
"""Returns a random node on which to perform an operation."""
return random.choice(self.nodes)
def _log(self, event, message):
"""Logs an event in the disruptor history."""
self.history.record(DisruptorEntry(event, message))
def _start(self, message):
"""Logs a start event in the disruptor history."""
self._log('start', message)
def _stop(self, message):
"""Logs a stop event in the disruptor history."""
self._log('stop', message)
def _disrupt(self, *args):
"""Executes a disruptor via the onos-disrupt utility."""
call(['onos-disrupt'] + list(args), timeout=5)
def _partition(self, node1, node2):
"""Partitions node1 from node2."""
self._disrupt(node1, 'partition', node2)
def _partition_halves(self):
"""Partitions the cluster into two halves."""
self._disrupt('partition-halves')
def _partition_bridge(self, nodes):
"""Partitions the cluster with the given node as a bridge between two halves."""
self._disrupt(nodes, 'partition-bridge')
def _heal(self, node1=None, node2=None):
"""Heals a partition between two nodes or between all nodes if the given nodes are None."""
if node1 is not None and node2 is not None:
self._disrupt(node1, 'heal', node2)
else:
self._disrupt('heal')
def _crash(self, node):
"""Crashes the given node."""
self._disrupt(node, 'crash')
def _recover(self, node):
"""Recovers the given node from a crash."""
self._disrupt(node, 'recover')
def _delay(self, node=None):
"""Delays communication from all nodes or from the given node if specified."""
if node is not None:
self._disrupt(node, 'delay')
else:
self._disrupt('delay')
def _restore(self, node=None):
"""Restores communication on all nodes or on the given node if specified."""
if node is not None:
self._disrupt(node, 'restore')
else:
self._disrupt('restore')
def partition_random(self):
"""Partitions two random nodes from each other."""
node1 = self._random_node()
node2 = node1
while node2 == node1:
node2 = self._random_node()
self._start("Cut off %s->%s" % (node1, node2))
self._partition(node1, node2)
self._wait()
self._heal(node1, node2)
self._stop("Fully connected")
def partition_halves(self):
"""Partitions the cluster into two halves."""
self._start("Partitioning network into two halves")
self._partition_halves()
self._wait()
self._heal()
self._stop("Fully connected")
def partition_bridge(self):
"""Partitions the cluster into two halves with a bridge between them."""
node = self._random_node()
self._start("Partitioning network with bridge %s" % (node,))
self._partition_bridge(node)
self._wait()
self._heal()
self._stop("Fully connected")
def crash_random(self):
"""Crashes a random node."""
node = random.choice(self.nodes)
self._start("Crashing %s" % (node,))
self._crash(node)
self._wait()
self._recover(node)
self._stop("Recovered %s" % (node,))
def delay(self):
"""Delays messages on all nodes."""
self._start("Delay communication on all nodes")
self._delay()
self._wait()
self._restore()
self._stop("Communication restored")
def delay_random(self):
"""Delays communication on a random node."""
node = self._random_node()
self._start("Delay communication on %s" % (node,))
self._delay(node)
self._wait()
self._restore(node)
self._stop("Communication restored on %s" % (node,))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="""
Perform random operations on a distributed primitive using multiple concurrent
processes and verify the linearizability of the history.
"""
)
parser.add_argument(
'-n',
'--primitive-name',
type=str,
default='test',
help="Name of the AtomicValue primitive to test. Defaults to 'test'"
)
parser.add_argument(
'-p',
'--parallelism',
type=int,
default=8,
help="Number of parallel processes with which to test. Defaults to 8"
)
parser.add_argument(
'-c',
'--operation-count',
type=int,
default=50,
help="Number of operations to execute per process. Defaults to 50 operations per process"
)
parser.add_argument(
'-od',
'--operation-delay',
type=int,
nargs=2,
default=[1, 5],
help="Uniform random delay to wait between operations. Defaults to 1-5 seconds per operation"
)
parser.add_argument(
'-d',
'--disruptors',
type=str,
nargs='*',
default=['partition_random'],
choices=['partition_random', 'partition_halves', 'partition_bridge', 'crash_random', 'delay', 'delay_random'],
help="""
List of disruptor functions to run. Disruptors will be run periodically
according to the configured --disruptor-delay. Defaults to 'partition_random'
"""
)
parser.add_argument(
'-dd',
'--disruptor-delay',
type=int,
nargs=2,
default=[15, 30],
help="Uniform random delay to wait between disruptor functions.Defaults to 15-30 seconds between disruptors"
)
args = parser.parse_args()
try:
run_test(
args.primitive_name,
args.parallelism,
args.operation_count,
args.operation_delay,
args.disruptors,
args.disruptor_delay
)
except KeyboardInterrupt:
sys.exit(1)