[ONOS-6842] Add fault injection/linearizable history verification script for distributed primitives

Change-Id: I059e21c1e1626f555889686387ec31dda7db8da4
diff --git a/tools/test/bin/onos-disrupt b/tools/test/bin/onos-disrupt
index ad20845..c8e343f 100755
--- a/tools/test/bin/onos-disrupt
+++ b/tools/test/bin/onos-disrupt
@@ -28,6 +28,14 @@
     echo "                        Examples:"
     echo "                            $command_name 10.127.10.111 crash"
     echo
+    echo "             recover    Recovers the given host"
+    echo
+    echo "                        Syntax:"
+    echo "                            $command_name <host> recover"
+    echo
+    echo "                        Examples:"
+    echo "                            $command_name 10.127.10.111 recover"
+    echo
     echo "           partition    Partitions the given host from a list of source host or all ONOS hosts if no sources are specified"
     echo
     echo "                        Syntax:"
@@ -49,10 +57,10 @@
     echo "    partition-bridge    Creates a bridge partition where the cluster is partitioned into two halves and the target host is connected to both halves"
     echo
     echo "                        Syntax:"
-    echo "                            $command_name <HOST> bridge"
+    echo "                            $command_name <HOST> partition-bridge"
     echo
     echo "                        Examples:"
-    echo "                            $command_name 10.127.10.111 bridge"
+    echo "                            $command_name 10.127.10.111 partition-bridge"
     echo
     echo "                heal    Heals a partition on the given host"
     echo
@@ -276,6 +284,12 @@
         onos-kill "${host}"
         ;;
 
+    # Recovers a crashed node
+    recover)
+        onos-service "${host}" start
+        onos-wait-for-start "${host}"
+        ;;
+
     # Creates a partition between the source node and a set of destination nodes.
     partition)
 
diff --git a/tools/test/bin/onos-dist-verify b/tools/test/bin/onos-dist-verify
new file mode 100755
index 0000000..9106428
--- /dev/null
+++ b/tools/test/bin/onos-dist-verify
@@ -0,0 +1,646 @@
+#!/usr/bin/env python
+"""
+usage: onos-dist-verify [-h] [-n PRIMITIVE_NAME] [-p PARALLELISM]
+                        [-c OPERATION_COUNT]
+                        [-od OPERATION_DELAY OPERATION_DELAY]
+                        [-d [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]]]
+                        [-dd DISRUPTOR_DELAY DISRUPTOR_DELAY]
+
+Perform random operations on a distributed primitive using multiple concurrent
+processes and verify the linearizability of the history.
+
+optional arguments:
+  -h, --help            show this help message and exit
+  -n PRIMITIVE_NAME, --primitive-name PRIMITIVE_NAME
+                        Name of the AtomicValue primitive to test. Defaults to
+                        'test'
+  -p PARALLELISM, --parallelism PARALLELISM
+                        Number of parallel processes with which to test.
+                        Defaults to 8
+  -c OPERATION_COUNT, --operation-count OPERATION_COUNT
+                        Number of operations to execute per process. Defaults
+                        to 50 operations per process
+  -od OPERATION_DELAY OPERATION_DELAY, --operation-delay OPERATION_DELAY OPERATION_DELAY
+                        Uniform random delay to wait between operations.
+                        Defaults to 1-5 seconds per operation
+  -d [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]], --disruptors [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} [{partition_random,partition_halves,partition_bridge,crash_random,delay,delay_random} ...]]
+                        List of disruptor functions to run. Disruptors will be
+                        run periodically according to the configured
+                        --disruptor-delay. Defaults to 'partition_random'
+  -dd DISRUPTOR_DELAY DISRUPTOR_DELAY, --disruptor-delay DISRUPTOR_DELAY DISRUPTOR_DELAY
+                        Uniform random delay to wait between disruptor
+                        functions.Defaults to 15-30 seconds between disruptors
+"""
+import argparse
+import random
+import re
+import sys
+import threading
+import time
+import os
+import subprocess
+from collections import OrderedDict
+from tempfile import NamedTemporaryFile
+from abc import ABCMeta, abstractmethod
+
+# The edn_format module is required to run tests
+try:
+    import edn_format
+except ImportError:
+    print "No edn_format module found; use 'pip install edn_format'"
+    sys.exit(1)
+
+DEVNULL = open(os.devnull, 'w')
+
+
+def run_test(name, processes, operation_count, operation_delay, disruptors, disruptor_delay):
+    """Runs the linearizability test."""
+
+    def get_nodes():
+        """Parses environment variables to get a sorted list of test nodes."""
+        alphanum_key = lambda key: [int(c) if c.isdigit() else c.lower() for c in re.split('([0-9]+)', key)]
+        vars = [var for var in os.environ if re.match(r"OC[0-9]+", var)]
+        return [os.environ[v] for v in sorted(vars, key=alphanum_key)]
+
+    # Parse the environment variables to get the list of test nodes.
+    nodes = get_nodes()
+
+    # Initialize the test by resetting the distributed value.
+    _init_test(name, nodes)
+
+    # Create a history object with which to track history
+    history = History()
+    disruptor = Disruptor(nodes, disruptors, disruptor_delay, history)
+    processes = [Process(i+1, name, operation_count, operation_delay, random.choice(nodes), history) for i in range(processes)]
+
+    # Start the test.
+    _start_test(disruptor, processes)
+
+    # Run the disruptor and processes until complete.
+    _block_until_complete(disruptor, processes)
+
+    # Check the recorded history for linearizability.
+    _verify_test(history)
+
+
+def _init_test(name, nodes):
+    """Initializes the test by resetting the test value value."""
+    node = nodes[0]
+    try:
+        subprocess.check_call(['onos', node, 'value-test', name, 'set', 'null'])
+    except subprocess.CalledProcessError, e:
+        print "Failed to reset test data"
+        sys.exit(1)
+
+
+def _start_test(disruptor, processes):
+    """Starts the test threads."""
+    for process in processes:
+        process.start()
+    disruptor.start()
+
+
+def _block_until_complete(disruptor, processes):
+    """Runs the given disruptor and processes until complete."""
+    while True:
+        # If any process is still running, sleep and then continue to the next iteration of the loop.
+        if len([process for process in processes if process.is_running()]) == 0:
+            # Once all processes have completed, stop the disruptor.
+            disruptor.stop()
+
+            # Wait for the disruptor thread to complete to ensure partitions are healed and crashed nodes are recovered.
+            if not disruptor.is_running():
+                break
+
+        # If we haven't broken out of the loop by now, sleep and then check again.
+        time.sleep(1)
+
+
+def _verify_test(history):
+    """Checks the given history for linearizability."""
+    try:
+        knossos_path = os.environ['KNOSSOS_PATH']
+    except KeyError:
+        print "KNOSSOS_PATH is not defined; skipping model checker"
+    else:
+        # Create and write a temporary file to be passed to the Knossos model checker.
+        with NamedTemporaryFile(mode='w+', delete=False) as f:
+            f.write(str(history))
+            file_name = f.name
+
+        # Run the Knossos model checker and parse the output to determine whether the history is linearizable.
+        try:
+            output = subprocess.check_output(['lein', 'run', '--model', 'cas-register', file_name], cwd=knossos_path)
+            result = output.strip().split()[-1]
+            if result == 'true':
+                print "\rHistory is linearizable! :-)"
+                exitcode = 0
+            else:
+                print "\rHistory is not linearizable. :-("
+                exitcode = 1
+        except subprocess.CalledProcessError, e:
+            exitcode = e.returncode
+
+        # Remove the temporary file before exiting.
+        try:
+            os.remove(file_name)
+        except:
+            pass
+        sys.exit(exitcode)
+
+
+def call(*args, **kwargs):
+    """Executes a command, awaiting the completion of the command with an optional timeout.
+
+    If a timeout is specified, a timer thread waits for the commit to complete or kills the command once the timeout
+    expires. Output from the subprocess is directed to os.devnull.
+    """
+    timeout = kwargs.pop('timeout', None)
+    process = subprocess.Popen(stdout=DEVNULL, stderr=DEVNULL, *args, **kwargs)
+    if timeout is not None:
+        timer = threading.Timer(timeout, lambda p: p.kill(), [process])
+        try:
+            timer.start()
+            return process.wait()
+        finally:
+            timer.cancel()
+
+
+def get_output(*args, **kwargs):
+    """Executes a command, awaiting the output of the command with an optional timeout.
+
+    If a timeout is specified, a timer thread waits for the commit to complete or kills the command once the timeout
+    expires. stderr output from the subprocess is directed to os.devnull. stdout is returned.
+    """
+    timeout = kwargs.pop('timeout', None)
+    process = subprocess.Popen(stdout=subprocess.PIPE, stderr=DEVNULL, *args, **kwargs)
+
+    def join_process():
+        stdout, stderr = process.communicate()
+        retcode = process.poll()
+        if retcode:
+            cmd = kwargs.get("args")
+            if cmd is None:
+                cmd = args[0]
+            raise subprocess.CalledProcessError(retcode, cmd, output=stdout)
+        return stdout
+
+    if timeout is not None:
+        timer = threading.Timer(timeout, lambda p: p.kill(), [process])
+        try:
+            timer.start()
+            return join_process()
+        finally:
+            timer.cancel()
+    else:
+        return join_process()
+
+
+class History(object):
+    """Records and logs the history of operations.
+
+    This object directly mimics the format expected by the Knossos linearizability checker. Events are logged in
+    edn format, and str(history) will return the full history in edn format.
+    """
+    def __init__(self):
+        self.entries = []
+
+    def record(self, entry):
+        """Records an entry in the history."""
+        self.entries.append(entry)
+        print str(entry).strip() + '\r'
+
+    def __str__(self):
+        return edn_format.dumps([entry.format() for entry in self.entries])
+
+
+class HistoryEntry(object):
+    """History entry."""
+    __metaclass__ = ABCMeta
+
+    @abstractmethod
+    def format(self):
+        """Returns the entry in EDN format."""
+
+    def __str__(self):
+        return edn_format.dumps(self.format())
+
+
+class ProcessEntry(HistoryEntry):
+    """Process entry."""
+    def __init__(self, process, action, operation, *values):
+        self.process = process
+        self.action = action
+        self.operation = operation
+        self.values = values
+
+    def format(self):
+        return OrderedDict([
+            (edn_format.Keyword('process'), self.process),
+            (edn_format.Keyword('type'), edn_format.Keyword(self.action)),
+            (edn_format.Keyword('f'), edn_format.Keyword(self.operation)),
+            (edn_format.Keyword('value'), self.values[0] if len(self.values) == 1 else list(self.values))
+        ])
+
+
+class DisruptorEntry(HistoryEntry):
+    """Disruptor history entry."""
+    def __init__(self, event, message):
+        self.event = event
+        self.message = message
+
+    def format(self):
+        return OrderedDict([
+            (edn_format.Keyword('process'), edn_format.Keyword('disruptor')),
+            (edn_format.Keyword('type'), edn_format.Keyword('info')),
+            (edn_format.Keyword('f'), edn_format.Keyword(self.event)),
+            (edn_format.Keyword('value'), self.message)
+        ])
+
+
+class Runnable(object):
+    """Base class for managing the lifecycle of a threaded test process."""
+    __metaclass__ = ABCMeta
+
+    def __init__(self):
+        self.thread = None
+        self.running = False
+
+    def start(self):
+        """Starts the runnable thread."""
+        self.thread = threading.Thread(target=self.run)
+        self.thread.daemon = True
+        self.running = True
+        self.thread.start()
+
+    @abstractmethod
+    def run(self):
+        """Runs the thread. This method should be overridden by implementors."""
+
+    def is_running(self):
+        """Returns a boolean indicating whether the disruptor is running."""
+        return self.running or self.thread.is_alive()
+
+    def stop(self):
+        """Stops the disruptor thread.
+
+        Calling this method will not immediately stop the thread. Instead, a flag will be set, and the run() method
+        is expected to exit according to the 'running' flag. Use 'is_running()' to determine whether the thread is
+        stopped and has exited.
+        """
+        self.running = False
+
+
+class Process(Runnable):
+    """Test runner for a single process.
+
+    A process simulates operations from a single actor in the cluster. When the process is started, it will begin
+    performing random read, write, or cas operations, sleeping for random intervals between operations. Each operation
+    performed by the process will be logged in the History object provided to the constructor. The process runs for a
+    predefined number of operations or until an operation fails with an unknown error (e.g. a timeout).
+    """
+    def __init__(self, id, name, operation_count, delay, node, history):
+        super(Process, self).__init__()
+        self.id = id
+        self.name = name
+        self.operation_count = operation_count
+        self.delay = delay
+        self.node = node
+        self.history = history
+        self.operations = (self.read, self.write, self.cas, self.read_and_cas)
+        self.value = None
+
+    def run(self):
+        """Runs the process."""
+        for _ in range(self.operation_count):
+            self._wait()
+            self._run()
+            if not self.running:
+                break
+        if self.running:
+            self.stop()
+
+    def _run(self):
+        """Runs a random operation."""
+        return random.choice(self.operations)()
+
+    def _wait(self):
+        """Blocks for a uniform random delay according to the process configuration."""
+        time.sleep(random.uniform(self.delay[0], self.delay[1]))
+
+    def _next_value(self):
+        """Returns the next random value to set."""
+        return random.randint(1, 10)
+
+    def _log(self, action, operation, *values):
+        """Logs an operation."""
+        self.history.record(ProcessEntry(self.id, action, operation, *values))
+
+    def _invoke(self, operation, *values):
+        """Logs an operation invocation event in the process history."""
+        self._log('invoke', operation, *values)
+
+    def _ok(self, operation, *values):
+        """Logs an operation success event in the process history."""
+        self._log('ok', operation, *values)
+        return True
+
+    def _fail(self, operation, *values):
+        """Logs an operation failure event in the process history."""
+        self._log('fail', operation, *values)
+        return True
+
+    def _info(self, operation, *values):
+        """Logs an operation info event in the process history and stops the process."""
+        self._log('info', operation, *values)
+        self.stop()
+        return False
+
+    def read(self):
+        """Executes a read operation."""
+        self._invoke('read', None)
+        try:
+            output = get_output([
+                'onos',
+                self.node,
+                'value-test',
+                self.name,
+                'get'
+            ], timeout=5).strip()
+            self.value = None if output == 'null' else int(output)
+            return self._ok('read', self.value)
+        except subprocess.CalledProcessError:
+            return self._info('read', None)
+
+    def write(self):
+        """Executes a write operation."""
+        next_value = self._next_value()
+        self._invoke('write', next_value)
+        try:
+            get_output([
+                'onos',
+                self.node,
+                'value-test',
+                self.name,
+                'set',
+                str(next_value)
+            ], timeout=5)
+            self.value = next_value
+            return self._ok('write', self.value)
+        except subprocess.CalledProcessError:
+            return self._info('write', next_value)
+
+    def cas(self):
+        """Executes a check-and-set operation."""
+        current_value, next_value = self.value, self._next_value()
+        self._invoke('cas', current_value, next_value)
+        try:
+            output = get_output([
+                'onos',
+                self.node,
+                'value-test',
+                self.name,
+                'compareAndSet',
+                str(current_value) if current_value is not None else 'null',
+                str(next_value)
+            ], timeout=5).strip()
+
+            if output == 'true':
+                self.value = next_value
+                return self._ok('cas', current_value, next_value)
+            elif output == 'false':
+                return self._fail('cas', current_value, next_value)
+            else:
+                return self._info('cas', current_value, next_value)
+        except subprocess.CalledProcessError:
+            return self._info('cas', current_value, next_value)
+
+    def read_and_cas(self):
+        """Executes a read to get the current value and then a check-and-set operation."""
+        if self.read():
+            self.cas()
+
+
+class Disruptor(Runnable):
+    """Cluster disruptor runner.
+
+    The disruptor periodically disrupts the cluster using a random disruptor function to e.g. partition the network,
+    crash a node, or slow communication within the network. The disruptor guarantees that only one disruptor function
+    will run at any given time and the previous disruptor will be healed prior to the next disruptor beginning.
+    The disruptor sleeps for a uniform random interval between disruptor functions.
+    """
+    def __init__(self, nodes, disruptors, delay, history):
+        super(Disruptor, self).__init__()
+        self.nodes = nodes
+        self.delay = delay
+        self.history = history
+        self.disruptors = []
+        for name in disruptors:
+            try:
+                self.disruptors.append(getattr(self, name))
+            except AttributeError:
+                print "Unknown disruptor %s" % (name,)
+                sys.exit(1)
+
+    def run(self):
+        """Runs the disruptor until stopped."""
+        while self.running:
+            self._wait()
+            if self.running:
+                self._run()
+
+    def _run(self):
+        """Runs a random disruptor."""
+        random.choice(self.disruptors)()
+
+    def _wait(self):
+        """Waits for a uniform random delay."""
+        time.sleep(random.uniform(self.delay[0], self.delay[1]))
+
+    def _random_node(self):
+        """Returns a random node on which to perform an operation."""
+        return random.choice(self.nodes)
+
+    def _log(self, event, message):
+        """Logs an event in the disruptor history."""
+        self.history.record(DisruptorEntry(event, message))
+
+    def _start(self, message):
+        """Logs a start event in the disruptor history."""
+        self._log('start', message)
+
+    def _stop(self, message):
+        """Logs a stop event in the disruptor history."""
+        self._log('stop', message)
+
+    def _disrupt(self, *args):
+        """Executes a disruptor via the onos-disrupt utility."""
+        call(['onos-disrupt'] + list(args), timeout=5)
+
+    def _partition(self, node1, node2):
+        """Partitions node1 from node2."""
+        self._disrupt(node1, 'partition', node2)
+
+    def _partition_halves(self):
+        """Partitions the cluster into two halves."""
+        self._disrupt('partition-halves')
+
+    def _partition_bridge(self, nodes):
+        """Partitions the cluster with the given node as a bridge between two halves."""
+        self._disrupt(nodes, 'partition-bridge')
+
+    def _heal(self, node1=None, node2=None):
+        """Heals a partition between two nodes or between all nodes if the given nodes are None."""
+        if node1 is not None and node2 is not None:
+            self._disrupt(node1, 'heal', node2)
+        else:
+            self._disrupt('heal')
+
+    def _crash(self, node):
+        """Crashes the given node."""
+        self._disrupt(node, 'crash')
+
+    def _recover(self, node):
+        """Recovers the given node from a crash."""
+        self._disrupt(node, 'recover')
+
+    def _delay(self, node=None):
+        """Delays communication from all nodes or from the given node if specified."""
+        if node is not None:
+            self._disrupt(node, 'delay')
+        else:
+            self._disrupt('delay')
+
+    def _restore(self, node=None):
+        """Restores communication on all nodes or on the given node if specified."""
+        if node is not None:
+            self._disrupt(node, 'restore')
+        else:
+            self._disrupt('restore')
+
+    def partition_random(self):
+        """Partitions two random nodes from each other."""
+        node1 = self._random_node()
+        node2 = node1
+        while node2 == node1:
+            node2 = self._random_node()
+        self._start("Cut off %s->%s" % (node1, node2))
+        self._partition(node1, node2)
+        self._wait()
+        self._heal(node1, node2)
+        self._stop("Fully connected")
+
+    def partition_halves(self):
+        """Partitions the cluster into two halves."""
+        self._start("Partitioning network into two halves")
+        self._partition_halves()
+        self._wait()
+        self._heal()
+        self._stop("Fully connected")
+
+    def partition_bridge(self):
+        """Partitions the cluster into two halves with a bridge between them."""
+        node = self._random_node()
+        self._start("Partitioning network with bridge %s" % (node,))
+        self._partition_bridge(node)
+        self._wait()
+        self._heal()
+        self._stop("Fully connected")
+
+    def crash_random(self):
+        """Crashes a random node."""
+        node = random.choice(self.nodes)
+        self._start("Crashing %s" % (node,))
+        self._crash(node)
+        self._wait()
+        self._recover(node)
+        self._stop("Recovered %s" % (node,))
+
+    def delay(self):
+        """Delays messages on all nodes."""
+        self._start("Delay communication on all nodes")
+        self._delay()
+        self._wait()
+        self._restore()
+        self._stop("Communication restored")
+
+    def delay_random(self):
+        """Delays communication on a random node."""
+        node = self._random_node()
+        self._start("Delay communication on %s" % (node,))
+        self._delay(node)
+        self._wait()
+        self._restore(node)
+        self._stop("Communication restored on %s" % (node,))
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(
+        description="""
+        Perform random operations on a distributed primitive using multiple concurrent 
+        processes and verify the linearizability of the history.
+        """
+    )
+    parser.add_argument(
+        '-n',
+        '--primitive-name',
+        type=str,
+        default='test',
+        help="Name of the AtomicValue primitive to test. Defaults to 'test'"
+    )
+    parser.add_argument(
+        '-p',
+        '--parallelism',
+        type=int,
+        default=8,
+        help="Number of parallel processes with which to test. Defaults to 8"
+    )
+    parser.add_argument(
+        '-c',
+        '--operation-count',
+        type=int,
+        default=50,
+        help="Number of operations to execute per process. Defaults to 50 operations per process"
+    )
+    parser.add_argument(
+        '-od',
+        '--operation-delay',
+        type=int,
+        nargs=2,
+        default=[1, 5],
+        help="Uniform random delay to wait between operations. Defaults to 1-5 seconds per operation"
+    )
+    parser.add_argument(
+        '-d',
+        '--disruptors',
+        type=str,
+        nargs='*',
+        default=['partition_random'],
+        choices=['partition_random', 'partition_halves', 'partition_bridge', 'crash_random', 'delay', 'delay_random'],
+        help="""
+        List of disruptor functions to run. Disruptors will be run periodically 
+        according to the configured --disruptor-delay. Defaults to 'partition_random'
+        """
+    )
+    parser.add_argument(
+        '-dd',
+        '--disruptor-delay',
+        type=int,
+        nargs=2,
+        default=[15, 30],
+        help="Uniform random delay to wait between disruptor functions.Defaults to 15-30 seconds between disruptors"
+    )
+
+    args = parser.parse_args()
+
+    try:
+        run_test(
+            args.primitive_name,
+            args.parallelism,
+            args.operation_count,
+            args.operation_delay,
+            args.disruptors,
+            args.disruptor_delay
+        )
+    except KeyboardInterrupt:
+        sys.exit(1)