Developer Guide

The packages for this pytest plugin are divided into three main modules:

  1. coordinator - for coordinating parallel execution from a coordinator process to multiple worker processes

  2. worker - contains code for processing tests as they are popped off of the queue (a queue fed by the coordinator)

  3. plugin - contains the pytest standard hooks for processing pytest arguments and performing actions at key points along the test execution flow

The system starts by starting all threads (processes). Each thread processes the list of tests available using standard pytest code. This may seem inefficient in terms of CPU usage, but since processes are each on their own core, very little overall time is wasted. The reason for doing this is that pytest items are not serializable to send across a queue, and so only test nodeid’s are sent for lookup by the worker node.

The system functions by having the coordinator node push all tests to a queue shared with all workers. The workers then grab the next available test from the queue for execution. This pull model alleviates any additional work needed by the developer to figure out how to split up their tests for execution. Ideally, longer running tests would be executed first and short tests run last, to best distribute the workload.

A Note about Running Tests for pytest_mproc

Please note that one of the tests for this product is designed to fail deliberately. This is to test that test status is reported properly for failed tests when using this pytest plugin.

Coordinator

This package contains code to coordinate execution from a main thread to worker threads (processes)

class pytest_mproc.coordinator.Coordinator

Context manager for kicking off worker Processes to conduct test execution via pytest hooks Coordinators are scoped to a node and only handle a collection of workers on that node

kill() None

Abruptly kill all worker processes

shutdown(timeout: Optional[float] = None) None

shutdown servers and clean up :param timeout: timeout if taking too long :raises: TimeoutError if taking too long

classmethod singleton() Coordinator
Returns:

sole (possibly created) singleton instance of this class

start_workers(uri: str, num_processes: int) None

State a worker

Parameters:
  • uri – uri of server that manages the test and result queues

  • num_processes – number of worker processes to launch

Worker

class pytest_mproc.worker.WorkerSession(index, is_remote: bool, test_q: JoinableQueue, result_q: Queue)

Handles reporting of test status and the like

pytest_collection_finish(session)

Invoked once pytest has collected all information about which tests to run. Those items are squirrelled away in a different attributes, and a generator is put in its place to draw from.

Parameters:

session – the pytest session

Returns:

the generator of tests to run

pytest_runtest_logreport(report)

report only status of calls (not setup or teardown), unless there was an error in those stash report for later end-game parsing of failures

Parameters:

report – report to draw info from

session_finish()

output failure information and final exit status back to coordinator

classmethod start(uri: str, executable: str) Popen
Parameters:
  • uri – uri of main server

  • executable – which executable to run under

Returns:

Process created for new worker

test_loop(session)

This is where the action takes place. We override the usual implementation since that doesn’t support a dynamic generation of tests (whose source is the test Queue that it draws from to pick out the next test)

Parameters:

session – Where the tests generator is kept