[ONOS-6842] Add fault injection/linearizable history verification script for distributed primitives
Change-Id: I059e21c1e1626f555889686387ec31dda7db8da4
diff --git a/tools/test/bin/onos-dist-verify b/tools/test/bin/onos-dist-verify
new file mode 100755
index 0000000..9106428
--- /dev/null
+++ b/tools/test/bin/onos-dist-verify
@@ -0,0 +1,646 @@
+#!/usr/bin/env python
+"""
+usage: onos-dist-verify [-h] [-n PRIMITIVE_NAME] [-p PARALLELISM]
+ [-c OPERATION_COUNT]
+ [-od OPERATION_DELAY OPERATION_DELAY]
+ [-d [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]]]
+ [-dd DISRUPTOR_DELAY DISRUPTOR_DELAY]
+
+Perform random operations on a distributed primitive using multiple concurrent
+processes and verify the linearizability of the history.
+
+optional arguments:
+ -h, --help show this help message and exit
+ -n PRIMITIVE_NAME, --primitive-name PRIMITIVE_NAME
+ Name of the AtomicValue primitive to test. Defaults to
+ 'test'
+ -p PARALLELISM, --parallelism PARALLELISM
+ Number of parallel processes with which to test.
+ Defaults to 8
+ -c OPERATION_COUNT, --operation-count OPERATION_COUNT
+ Number of operations to execute per process. Defaults
+ to 50 operations per process
+ -od OPERATION_DELAY OPERATION_DELAY, --operation-delay OPERATION_DELAY OPERATION_DELAY
+ Uniform random delay to wait between operations.
+ Defaults to 1-5 seconds per operation
+ -d [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]], --disruptors [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]]
+ List of disruptor functions to run. Disruptors will be
+ run periodically according to the configured
+ --disruptor-delay. Defaults to 'partition_random'
+ -dd DISRUPTOR_DELAY DISRUPTOR_DELAY, --disruptor-delay DISRUPTOR_DELAY DISRUPTOR_DELAY
+ Uniform random delay to wait between disruptor
+ functions.Defaults to 15-30 seconds between disruptors
+"""
+import argparse
+import random
+import re
+import sys
+import threading
+import time
+import os
+import subprocess
+from collections import OrderedDict
+from tempfile import NamedTemporaryFile
+from abc import ABCMeta, abstractmethod
+
+# The edn_format module is required to run tests
+try:
+ import edn_format
+except ImportError:
+ print "No edn_format module found; use 'pip install edn_format'"
+ sys.exit(1)
+
+DEVNULL = open(os.devnull, 'w')
+
+
+def run_test(name, processes, operation_count, operation_delay, disruptors, disruptor_delay):
+ """Runs the linearizability test."""
+
+ def get_nodes():
+ """Parses environment variables to get a sorted list of test nodes."""
+ alphanum_key = lambda key: [int(c) if c.isdigit() else c.lower() for c in re.split('([0-9]+)', key)]
+ vars = [var for var in os.environ if re.match(r"OC[0-9]+", var)]
+ return [os.environ[v] for v in sorted(vars, key=alphanum_key)]
+
+ # Parse the environment variables to get the list of test nodes.
+ nodes = get_nodes()
+
+ # Initialize the test by resetting the distributed value.
+ _init_test(name, nodes)
+
+ # Create a history object with which to track history
+ history = History()
+ disruptor = Disruptor(nodes, disruptors, disruptor_delay, history)
+ processes = [Process(i+1, name, operation_count, operation_delay, random.choice(nodes), history) for i in range(processes)]
+
+ # Start the test.
+ _start_test(disruptor, processes)
+
+ # Run the disruptor and processes until complete.
+ _block_until_complete(disruptor, processes)
+
+ # Check the recorded history for linearizability.
+ _verify_test(history)
+
+
+def _init_test(name, nodes):
+ """Initializes the test by resetting the test value value."""
+ node = nodes[0]
+ try:
+ subprocess.check_call(['onos', node, 'value-test', name, 'set', 'null'])
+ except subprocess.CalledProcessError, e:
+ print "Failed to reset test data"
+ sys.exit(1)
+
+
+def _start_test(disruptor, processes):
+ """Starts the test threads."""
+ for process in processes:
+ process.start()
+ disruptor.start()
+
+
+def _block_until_complete(disruptor, processes):
+ """Runs the given disruptor and processes until complete."""
+ while True:
+ # If any process is still running, sleep and then continue to the next iteration of the loop.
+ if len([process for process in processes if process.is_running()]) == 0:
+ # Once all processes have completed, stop the disruptor.
+ disruptor.stop()
+
+ # Wait for the disruptor thread to complete to ensure partitions are healed and crashed nodes are recovered.
+ if not disruptor.is_running():
+ break
+
+ # If we haven't broken out of the loop by now, sleep and then check again.
+ time.sleep(1)
+
+
+def _verify_test(history):
+ """Checks the given history for linearizability."""
+ try:
+ knossos_path = os.environ['KNOSSOS_PATH']
+ except KeyError:
+ print "KNOSSOS_PATH is not defined; skipping model checker"
+ else:
+ # Create and write a temporary file to be passed to the Knossos model checker.
+ with NamedTemporaryFile(mode='w+', delete=False) as f:
+ f.write(str(history))
+ file_name = f.name
+
+ # Run the Knossos model checker and parse the output to determine whether the history is linearizable.
+ try:
+ output = subprocess.check_output(['lein', 'run', '--model', 'cas-register', file_name], cwd=knossos_path)
+ result = output.strip().split()[-1]
+ if result == 'true':
+ print "\rHistory is linearizable! :-)"
+ exitcode = 0
+ else:
+ print "\rHistory is not linearizable. :-("
+ exitcode = 1
+ except subprocess.CalledProcessError, e:
+ exitcode = e.returncode
+
+ # Remove the temporary file before exiting.
+ try:
+ os.remove(file_name)
+ except:
+ pass
+ sys.exit(exitcode)
+
+
+def call(*args, **kwargs):
+ """Executes a command, awaiting the completion of the command with an optional timeout.
+
+ If a timeout is specified, a timer thread waits for the commit to complete or kills the command once the timeout
+ expires. Output from the subprocess is directed to os.devnull.
+ """
+ timeout = kwargs.pop('timeout', None)
+ process = subprocess.Popen(stdout=DEVNULL, stderr=DEVNULL, *args, **kwargs)
+ if timeout is not None:
+ timer = threading.Timer(timeout, lambda p: p.kill(), [process])
+ try:
+ timer.start()
+ return process.wait()
+ finally:
+ timer.cancel()
+
+
+def get_output(*args, **kwargs):
+ """Executes a command, awaiting the output of the command with an optional timeout.
+
+ If a timeout is specified, a timer thread waits for the commit to complete or kills the command once the timeout
+ expires. stderr output from the subprocess is directed to os.devnull. stdout is returned.
+ """
+ timeout = kwargs.pop('timeout', None)
+ process = subprocess.Popen(stdout=subprocess.PIPE, stderr=DEVNULL, *args, **kwargs)
+
+ def join_process():
+ stdout, stderr = process.communicate()
+ retcode = process.poll()
+ if retcode:
+ cmd = kwargs.get("args")
+ if cmd is None:
+ cmd = args[0]
+ raise subprocess.CalledProcessError(retcode, cmd, output=stdout)
+ return stdout
+
+ if timeout is not None:
+ timer = threading.Timer(timeout, lambda p: p.kill(), [process])
+ try:
+ timer.start()
+ return join_process()
+ finally:
+ timer.cancel()
+ else:
+ return join_process()
+
+
+class History(object):
+ """Records and logs the history of operations.
+
+ This object directly mimics the format expected by the Knossos linearizability checker. Events are logged in
+ edn format, and str(history) will return the full history in edn format.
+ """
+ def __init__(self):
+ self.entries = []
+
+ def record(self, entry):
+ """Records an entry in the history."""
+ self.entries.append(entry)
+ print str(entry).strip() + '\r'
+
+ def __str__(self):
+ return edn_format.dumps([entry.format() for entry in self.entries])
+
+
+class HistoryEntry(object):
+ """History entry."""
+ __metaclass__ = ABCMeta
+
+ @abstractmethod
+ def format(self):
+ """Returns the entry in EDN format."""
+
+ def __str__(self):
+ return edn_format.dumps(self.format())
+
+
+class ProcessEntry(HistoryEntry):
+ """Process entry."""
+ def __init__(self, process, action, operation, *values):
+ self.process = process
+ self.action = action
+ self.operation = operation
+ self.values = values
+
+ def format(self):
+ return OrderedDict([
+ (edn_format.Keyword('process'), self.process),
+ (edn_format.Keyword('type'), edn_format.Keyword(self.action)),
+ (edn_format.Keyword('f'), edn_format.Keyword(self.operation)),
+ (edn_format.Keyword('value'), self.values[0] if len(self.values) == 1 else list(self.values))
+ ])
+
+
+class DisruptorEntry(HistoryEntry):
+ """Disruptor history entry."""
+ def __init__(self, event, message):
+ self.event = event
+ self.message = message
+
+ def format(self):
+ return OrderedDict([
+ (edn_format.Keyword('process'), edn_format.Keyword('disruptor')),
+ (edn_format.Keyword('type'), edn_format.Keyword('info')),
+ (edn_format.Keyword('f'), edn_format.Keyword(self.event)),
+ (edn_format.Keyword('value'), self.message)
+ ])
+
+
+class Runnable(object):
+ """Base class for managing the lifecycle of a threaded test process."""
+ __metaclass__ = ABCMeta
+
+ def __init__(self):
+ self.thread = None
+ self.running = False
+
+ def start(self):
+ """Starts the runnable thread."""
+ self.thread = threading.Thread(target=self.run)
+ self.thread.daemon = True
+ self.running = True
+ self.thread.start()
+
+ @abstractmethod
+ def run(self):
+ """Runs the thread. This method should be overridden by implementors."""
+
+ def is_running(self):
+ """Returns a boolean indicating whether the disruptor is running."""
+ return self.running or self.thread.is_alive()
+
+ def stop(self):
+ """Stops the disruptor thread.
+
+ Calling this method will not immediately stop the thread. Instead, a flag will be set, and the run() method
+ is expected to exit according to the 'running' flag. Use 'is_running()' to determine whether the thread is
+ stopped and has exited.
+ """
+ self.running = False
+
+
+class Process(Runnable):
+ """Test runner for a single process.
+
+ A process simulates operations from a single actor in the cluster. When the process is started, it will begin
+ performing random read, write, or cas operations, sleeping for random intervals between operations. Each operation
+ performed by the process will be logged in the History object provided to the constructor. The process runs for a
+ predefined number of operations or until an operation fails with an unknown error (e.g. a timeout).
+ """
+ def __init__(self, id, name, operation_count, delay, node, history):
+ super(Process, self).__init__()
+ self.id = id
+ self.name = name
+ self.operation_count = operation_count
+ self.delay = delay
+ self.node = node
+ self.history = history
+ self.operations = (self.read, self.write, self.cas, self.read_and_cas)
+ self.value = None
+
+ def run(self):
+ """Runs the process."""
+ for _ in range(self.operation_count):
+ self._wait()
+ self._run()
+ if not self.running:
+ break
+ if self.running:
+ self.stop()
+
+ def _run(self):
+ """Runs a random operation."""
+ return random.choice(self.operations)()
+
+ def _wait(self):
+ """Blocks for a uniform random delay according to the process configuration."""
+ time.sleep(random.uniform(self.delay[0], self.delay[1]))
+
+ def _next_value(self):
+ """Returns the next random value to set."""
+ return random.randint(1, 10)
+
+ def _log(self, action, operation, *values):
+ """Logs an operation."""
+ self.history.record(ProcessEntry(self.id, action, operation, *values))
+
+ def _invoke(self, operation, *values):
+ """Logs an operation invocation event in the process history."""
+ self._log('invoke', operation, *values)
+
+ def _ok(self, operation, *values):
+ """Logs an operation success event in the process history."""
+ self._log('ok', operation, *values)
+ return True
+
+ def _fail(self, operation, *values):
+ """Logs an operation failure event in the process history."""
+ self._log('fail', operation, *values)
+ return True
+
+ def _info(self, operation, *values):
+ """Logs an operation info event in the process history and stops the process."""
+ self._log('info', operation, *values)
+ self.stop()
+ return False
+
+ def read(self):
+ """Executes a read operation."""
+ self._invoke('read', None)
+ try:
+ output = get_output([
+ 'onos',
+ self.node,
+ 'value-test',
+ self.name,
+ 'get'
+ ], timeout=5).strip()
+ self.value = None if output == 'null' else int(output)
+ return self._ok('read', self.value)
+ except subprocess.CalledProcessError:
+ return self._info('read', None)
+
+ def write(self):
+ """Executes a write operation."""
+ next_value = self._next_value()
+ self._invoke('write', next_value)
+ try:
+ get_output([
+ 'onos',
+ self.node,
+ 'value-test',
+ self.name,
+ 'set',
+ str(next_value)
+ ], timeout=5)
+ self.value = next_value
+ return self._ok('write', self.value)
+ except subprocess.CalledProcessError:
+ return self._info('write', next_value)
+
+ def cas(self):
+ """Executes a check-and-set operation."""
+ current_value, next_value = self.value, self._next_value()
+ self._invoke('cas', current_value, next_value)
+ try:
+ output = get_output([
+ 'onos',
+ self.node,
+ 'value-test',
+ self.name,
+ 'compareAndSet',
+ str(current_value) if current_value is not None else 'null',
+ str(next_value)
+ ], timeout=5).strip()
+
+ if output == 'true':
+ self.value = next_value
+ return self._ok('cas', current_value, next_value)
+ elif output == 'false':
+ return self._fail('cas', current_value, next_value)
+ else:
+ return self._info('cas', current_value, next_value)
+ except subprocess.CalledProcessError:
+ return self._info('cas', current_value, next_value)
+
+ def read_and_cas(self):
+ """Executes a read to get the current value and then a check-and-set operation."""
+ if self.read():
+ self.cas()
+
+
+class Disruptor(Runnable):
+ """Cluster disruptor runner.
+
+ The disruptor periodically disrupts the cluster using a random disruptor function to e.g. partition the network,
+ crash a node, or slow communication within the network. The disruptor guarantees that only one disruptor function
+ will run at any given time and the previous disruptor will be healed prior to the next disruptor beginning.
+ The disruptor sleeps for a uniform random interval between disruptor functions.
+ """
+ def __init__(self, nodes, disruptors, delay, history):
+ super(Disruptor, self).__init__()
+ self.nodes = nodes
+ self.delay = delay
+ self.history = history
+ self.disruptors = []
+ for name in disruptors:
+ try:
+ self.disruptors.append(getattr(self, name))
+ except AttributeError:
+ print "Unknown disruptor %s" % (name,)
+ sys.exit(1)
+
+ def run(self):
+ """Runs the disruptor until stopped."""
+ while self.running:
+ self._wait()
+ if self.running:
+ self._run()
+
+ def _run(self):
+ """Runs a random disruptor."""
+ random.choice(self.disruptors)()
+
+ def _wait(self):
+ """Waits for a uniform random delay."""
+ time.sleep(random.uniform(self.delay[0], self.delay[1]))
+
+ def _random_node(self):
+ """Returns a random node on which to perform an operation."""
+ return random.choice(self.nodes)
+
+ def _log(self, event, message):
+ """Logs an event in the disruptor history."""
+ self.history.record(DisruptorEntry(event, message))
+
+ def _start(self, message):
+ """Logs a start event in the disruptor history."""
+ self._log('start', message)
+
+ def _stop(self, message):
+ """Logs a stop event in the disruptor history."""
+ self._log('stop', message)
+
+ def _disrupt(self, *args):
+ """Executes a disruptor via the onos-disrupt utility."""
+ call(['onos-disrupt'] + list(args), timeout=5)
+
+ def _partition(self, node1, node2):
+ """Partitions node1 from node2."""
+ self._disrupt(node1, 'partition', node2)
+
+ def _partition_halves(self):
+ """Partitions the cluster into two halves."""
+ self._disrupt('partition-halves')
+
+ def _partition_bridge(self, nodes):
+ """Partitions the cluster with the given node as a bridge between two halves."""
+ self._disrupt(nodes, 'partition-bridge')
+
+ def _heal(self, node1=None, node2=None):
+ """Heals a partition between two nodes or between all nodes if the given nodes are None."""
+ if node1 is not None and node2 is not None:
+ self._disrupt(node1, 'heal', node2)
+ else:
+ self._disrupt('heal')
+
+ def _crash(self, node):
+ """Crashes the given node."""
+ self._disrupt(node, 'crash')
+
+ def _recover(self, node):
+ """Recovers the given node from a crash."""
+ self._disrupt(node, 'recover')
+
+ def _delay(self, node=None):
+ """Delays communication from all nodes or from the given node if specified."""
+ if node is not None:
+ self._disrupt(node, 'delay')
+ else:
+ self._disrupt('delay')
+
+ def _restore(self, node=None):
+ """Restores communication on all nodes or on the given node if specified."""
+ if node is not None:
+ self._disrupt(node, 'restore')
+ else:
+ self._disrupt('restore')
+
+ def partition_random(self):
+ """Partitions two random nodes from each other."""
+ node1 = self._random_node()
+ node2 = node1
+ while node2 == node1:
+ node2 = self._random_node()
+ self._start("Cut off %s->%s" % (node1, node2))
+ self._partition(node1, node2)
+ self._wait()
+ self._heal(node1, node2)
+ self._stop("Fully connected")
+
+ def partition_halves(self):
+ """Partitions the cluster into two halves."""
+ self._start("Partitioning network into two halves")
+ self._partition_halves()
+ self._wait()
+ self._heal()
+ self._stop("Fully connected")
+
+ def partition_bridge(self):
+ """Partitions the cluster into two halves with a bridge between them."""
+ node = self._random_node()
+ self._start("Partitioning network with bridge %s" % (node,))
+ self._partition_bridge(node)
+ self._wait()
+ self._heal()
+ self._stop("Fully connected")
+
+ def crash_random(self):
+ """Crashes a random node."""
+ node = random.choice(self.nodes)
+ self._start("Crashing %s" % (node,))
+ self._crash(node)
+ self._wait()
+ self._recover(node)
+ self._stop("Recovered %s" % (node,))
+
+ def delay(self):
+ """Delays messages on all nodes."""
+ self._start("Delay communication on all nodes")
+ self._delay()
+ self._wait()
+ self._restore()
+ self._stop("Communication restored")
+
+ def delay_random(self):
+ """Delays communication on a random node."""
+ node = self._random_node()
+ self._start("Delay communication on %s" % (node,))
+ self._delay(node)
+ self._wait()
+ self._restore(node)
+ self._stop("Communication restored on %s" % (node,))
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description="""
+ Perform random operations on a distributed primitive using multiple concurrent
+ processes and verify the linearizability of the history.
+ """
+ )
+ parser.add_argument(
+ '-n',
+ '--primitive-name',
+ type=str,
+ default='test',
+ help="Name of the AtomicValue primitive to test. Defaults to 'test'"
+ )
+ parser.add_argument(
+ '-p',
+ '--parallelism',
+ type=int,
+ default=8,
+ help="Number of parallel processes with which to test. Defaults to 8"
+ )
+ parser.add_argument(
+ '-c',
+ '--operation-count',
+ type=int,
+ default=50,
+ help="Number of operations to execute per process. Defaults to 50 operations per process"
+ )
+ parser.add_argument(
+ '-od',
+ '--operation-delay',
+ type=int,
+ nargs=2,
+ default=[1, 5],
+ help="Uniform random delay to wait between operations. Defaults to 1-5 seconds per operation"
+ )
+ parser.add_argument(
+ '-d',
+ '--disruptors',
+ type=str,
+ nargs='*',
+ default=['partition_random'],
+ choices=['partition_random', 'partition_halves', 'partition_bridge', 'crash_random', 'delay', 'delay_random'],
+ help="""
+ List of disruptor functions to run. Disruptors will be run periodically
+ according to the configured --disruptor-delay. Defaults to 'partition_random'
+ """
+ )
+ parser.add_argument(
+ '-dd',
+ '--disruptor-delay',
+ type=int,
+ nargs=2,
+ default=[15, 30],
+ help="Uniform random delay to wait between disruptor functions.Defaults to 15-30 seconds between disruptors"
+ )
+
+ args = parser.parse_args()
+
+ try:
+ run_test(
+ args.primitive_name,
+ args.parallelism,
+ args.operation_count,
+ args.operation_delay,
+ args.disruptors,
+ args.disruptor_delay
+ )
+ except KeyboardInterrupt:
+ sys.exit(1)