Pytest源码分析
By:授客 QQ:1033553122
测试环境
pytest 5.4.3
测试脚本mytest.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pytest
def test_func(): # test开头的测试函数
print("test_func")
assert 1 # 断言成功
if __name__ == '__main__':
pytest.main() # 执行测试
源码分析
测试脚本mytest.py
import pytest
运行pytest/__init__.py
,主要做了两件事情
- 从
_pytest
导入后续需要用的依赖包 - 通过
_pytest/compat.py
模块的_setup_collect_fakemodule()
建立一个伪模块pytest.collect
pytest/__init__.py
# PYTHON_ARGCOMPLETE_OK
"""
pytest: unit and functional testing with Python.
"""
from _pytest import __version__
from _pytest.assertion import register_assert_rewrite
from _pytest.compat import _setup_collect_fakemodule
from _pytest.config import cmdline
from _pytest.config import ExitCode
from _pytest.config import hookimpl
from _pytest.config import hookspec
from _pytest.config import main
from _pytest.config import UsageError
from _pytest.debugging import pytestPDB as __pytestPDB
from _pytest.fixtures import fillfixtures as _fillfuncargs
from _pytest.fixtures import fixture
from _pytest.fixtures import yield_fixture
from _pytest.freeze_support import freeze_includes
from _pytest.main import Session
from _pytest.mark import MARK_GEN as mark
from _pytest.mark import param
from _pytest.nodes import Collector
from _pytest.nodes import File
from _pytest.nodes import Item
from _pytest.outcomes import exit
from _pytest.outcomes import fail
from _pytest.outcomes import importorskip
from _pytest.outcomes import skip
from _pytest.outcomes import xfail
from _pytest.python import Class
from _pytest.python import Function
from _pytest.python import Instance
from _pytest.python import Module
from _pytest.python import Package
from _pytest.python_api import approx
from _pytest.python_api import raises
from _pytest.recwarn import deprecated_call
from _pytest.recwarn import warns
from _pytest.warning_types import PytestAssertRewriteWarning
from _pytest.warning_types import PytestCacheWarning
from _pytest.warning_types import PytestCollectionWarning
from _pytest.warning_types import PytestConfigWarning
from _pytest.warning_types import PytestDeprecationWarning
from _pytest.warning_types import PytestExperimentalApiWarning
from _pytest.warning_types import PytestUnhandledCoroutineWarning
from _pytest.warning_types import PytestUnknownMarkWarning
from _pytest.warning_types import PytestWarning
set_trace = __pytestPDB.set_trace
__all__ = [
"__version__",
"_fillfuncargs",
"approx",
"Class",
"cmdline",
"Collector",
"deprecated_call",
"exit",
"ExitCode",
"fail",
"File",
"fixture",
"freeze_includes",
"Function",
"hookimpl",
"hookspec",
"importorskip",
"Instance",
"Item",
"main",
"mark",
"Module",
"Package",
"param",
"PytestAssertRewriteWarning",
"PytestCacheWarning",
"PytestCollectionWarning",
"PytestConfigWarning",
"PytestDeprecationWarning",
"PytestExperimentalApiWarning",
"PytestUnhandledCoroutineWarning",
"PytestUnknownMarkWarning",
"PytestWarning",
"raises",
"register_assert_rewrite",
"Session",
"set_trace",
"skip",
"UsageError",
"warns",
"xfail",
"yield_fixture",
]
_setup_collect_fakemodule() # 建立一个伪模块`pytest.collect`
del _setup_collect_fakemodule
_pytest/compat.py
_setup_collect_fakemodule
函数
COLLECT_FAKEMODULE_ATTRIBUTES = (
"Collector",
"Module",
"Function",
"Instance",
"Session",
"Item",
"Class",
"File",
"_fillfuncargs",
)
def _setup_collect_fakemodule() -> None:
from types import ModuleType
import pytest
# Types ignored because the module is created dynamically.
pytest.collect = ModuleType("pytest.collect") # type: ignore
pytest.collect.__all__ = [] # type: ignore # used for setns
for attr_name in COLLECT_FAKEMODULE_ATTRIBUTES:
setattr(pytest.collect, attr_name, getattr(pytest, attr_name)) # type: ignore
测试脚本myptest.py
pytest.main()
这里的main
函数为从_pytest/config/__init__.py
定义的全局函数--main
函数
_pytest/config/__init__.py
_pytest/config/__init__.py
main
函数定义
主要用于获取Config
对象config
,然后通过config.hook.pytest_cmdline_main
执行测试
def main(args=None, plugins=None) -> Union[int, ExitCode]:
""" return exit code, after performing an in-process test run.
:arg args: list of command line arguments.
:arg plugins: list of plugin objects to be auto-registered during
initialization.
"""
try:
try:
config = _prepareconfig(args, plugins)
except ConftestImportFailure as e:
exc_info = ExceptionInfo(e.excinfo)
tw = TerminalWriter(sys.stderr)
tw.line(
"ImportError while loading conftest '{e.path}'.".format(e=e), red=True
)
exc_info.traceback = exc_info.traceback.filter(filter_traceback)
exc_repr = (
exc_info.getrepr(style="short", chain=False)
if exc_info.traceback
else exc_info.exconly()
)
formatted_tb = str(exc_repr)
for line in formatted_tb.splitlines():
tw.line(line.rstrip(), red=True)
return ExitCode.USAGE_ERROR
else:
try:
ret = config.hook.pytest_cmdline_main(
config=config
) # type: Union[ExitCode, int]
try:
return ExitCode(ret)
except ValueError:
return ret
finally:
config._ensure_unconfigure()
except UsageError as e:
tw = TerminalWriter(sys.stderr)
for msg in e.args:
tw.line("ERROR: {}
".format(msg), red=True)
return ExitCode.USAGE_ERROR
_pytest/config/__init__.py
_prepareconfig
函数定义
主要是获取并返回Config对象config
,该对象通过函数pluginmanager.hook.pytest_cmdline_parse
返回
def _prepareconfig(
args: Optional[Union[py.path.local, List[str]]] = None, plugins=None
):
if args is None:
args = sys.argv[1:]
elif isinstance(args, py.path.local):
args = [str(args)]
elif not isinstance(args, list):
msg = "`args` parameter expected to be a list of strings, got: {!r} (type: {})"
raise TypeError(msg.format(args, type(args)))
config = get_config(args, plugins)
pluginmanager = config.pluginmanager
try:
if plugins:
for plugin in plugins:
if isinstance(plugin, str):
pluginmanager.consider_pluginarg(plugin)
else:
pluginmanager.register(plugin)
return pluginmanager.hook.pytest_cmdline_parse(
pluginmanager=pluginmanager, args=args
)
except BaseException:
config._ensure_unconfigure()
raise
_pytest/config/__init__.py
get_config
函数定义
主要是构造Config
对象
# Plugins that cannot be disabled via "-p no:X" currently.
essential_plugins = (
"mark",
"main",
"runner",
"fixtures",
"helpconfig", # Provides -p.
)
default_plugins = essential_plugins + (
"python",
"terminal",
"debugging",
"unittest",
"capture",
"skipping",
"tmpdir",
"monkeypatch",
"recwarn",
"pastebin",
"nose",
"assertion",
"junitxml",
"resultlog",
"doctest",
"cacheprovider",
"freeze_support",
"setuponly",
"setupplan",
"stepwise",
"warnings",
"logging",
"reports",
"faulthandler",
)
builtin_plugins = set(default_plugins)
builtin_plugins.add("pytester")
def get_config(args=None, plugins=None):
# subsequent calls to main will create a fresh instance
pluginmanager = PytestPluginManager() # PytestPluginManager 继承于 PluginManager
config = Config(
pluginmanager,
invocation_params=Config.InvocationParams(
args=args or (), plugins=plugins, dir=Path().resolve()
),
)
if args is not None:
# Handle any "-p no:plugin" args.
pluginmanager.consider_preparse(args, exclude_only=True)
for spec in default_plugins:
pluginmanager.import_plugin(spec) # 为对象导入插件
return config
_pytest/config/__init__.py
Config
构造函数定义
构造函数参数pluginmanager
接收了外部传入的PytestPluginManager
实例对象,该参数被赋值给 self.pluginmanager
,同时初始化self.hook
值为self.pluginmanager.hook
,这样Config
对象就具备了pluggy
的插件管理及hook
能力,可通过Config对象.hook.hook函数
class Config:
# ... 略
def __init__(
self,
pluginmanager: PytestPluginManager,
*,
invocation_params: Optional[InvocationParams] = None,
) -> None:
from .argparsing import Parser, FILE_OR_DIR
if invocation_params is None:
invocation_params = self.InvocationParams(
args=(), plugins=None, dir=Path.cwd()
)
self.option = argparse.Namespace()
"""Access to command line option as attributes.
:type: argparse.Namespace
"""
self.invocation_params = invocation_params
"""The parameters with which pytest was invoked.
:type: InvocationParams
"""
_a = FILE_OR_DIR
self._parser = Parser(
usage=f"%(prog)s [options] [{_a}] [{_a}] [...]",
processopt=self._processopt,
)
self.pluginmanager = pluginmanager # 增加插件管理器
"""The plugin manager handles plugin registration and hook invocation.
:type: PytestPluginManager
"""
self.trace = self.pluginmanager.trace.root.get("config")
self.hook = self.pluginmanager.hook # 增加hook属性
self._inicache: Dict[str, Any] = {}
self._override_ini: Sequence[str] = ()
self._opt2dest: Dict[str, str] = {}
self._cleanup: List[Callable[[], None]] = []
# A place where plugins can store information on the config for their
# own use. Currently only intended for internal plugins.
self._store = Store()
self.pluginmanager.register(self, "pytestconfig")
self._configured = False
self.hook.pytest_addoption.call_historic(
kwargs=dict(parser=self._parser, pluginmanager=self.pluginmanager)
)
if TYPE_CHECKING:
from _pytest.cacheprovider import Cache
self.cache: Optional[Cache] = None
_pytest/config/__init__.py
PytestPluginManager
类
初始化时,通过self.add_hookspecs(_pytest.hookspec)
添加hook
函数声明(接口),同时通过self.register(self)
把自己注册为插件实现;
import_plugin
中拼接_pytest/config/__init__.py
中定义的模块,拼接后的形式,形如_pytest.python
,然后导入并注册插件
@final
class PytestPluginManager(PluginManager):
"""A :py:class:`pluggy.PluginManager <pluggy.PluginManager>` with
additional pytest-specific functionality:
* Loading plugins from the command line, ``PYTEST_PLUGINS`` env variable and
``pytest_plugins`` global variables found in plugins being loaded.
* ``conftest.py`` loading during start-up.
"""
def __init__(self) -> None:
import _pytest.assertion
super().__init__("pytest")
# The objects are module objects, only used generically.
self._conftest_plugins: Set[types.ModuleType] = set()
# State related to local conftest plugins.
self._dirpath2confmods: Dict[Path, List[types.ModuleType]] = {}
self._conftestpath2mod: Dict[Path, types.ModuleType] = {}
self._confcutdir: Optional[Path] = None
self._noconftest = False
self._duplicatepaths: Set[Path] = set()
# plugins that were explicitly skipped with pytest.skip
# list of (module name, skip reason)
# previously we would issue a warning when a plugin was skipped, but
# since we refactored warnings as first citizens of Config, they are
# just stored here to be used later.
self.skipped_plugins: List[Tuple[str, str]] = []
self.add_hookspecs(_pytest.hookspec)
self.register(self)
if os.environ.get("PYTEST_DEBUG"):
err: IO[str] = sys.stderr
encoding: str = getattr(err, "encoding", "utf8")
try:
err = open(
os.dup(err.fileno()),
mode=err.mode,
buffering=1,
encoding=encoding,
)
except Exception:
pass
self.trace.root.setwriter(err.write)
self.enable_tracing()
# Config._consider_importhook will set a real object if required.
self.rewrite_hook = _pytest.assertion.DummyRewriteHook()
# Used to know when we are importing conftests after the pytest_configure stage.
self._configured = False
def parse_hookimpl_opts(self, plugin: _PluggyPlugin, name: str):
# pytest hooks are always prefixed with "pytest_",
# so we avoid accessing possibly non-readable attributes
# (see issue #1073).
if not name.startswith("pytest_"):
return
# Ignore names which can not be hooks.
if name == "pytest_plugins":
return
method = getattr(plugin, name)
opts = super().parse_hookimpl_opts(plugin, name)
# Consider only actual functions for hooks (#3775).
if not inspect.isroutine(method):
return
# Collect unmarked hooks as long as they have the `pytest_' prefix.
if opts is None and name.startswith("pytest_"):
opts = {}
if opts is not None:
# TODO: DeprecationWarning, people should use hookimpl
# https://github.com/pytest-dev/pytest/issues/4562
known_marks = {m.name for m in getattr(method, "pytestmark", [])}
for name in ("tryfirst", "trylast", "optionalhook", "hookwrapper"):
opts.setdefault(name, hasattr(method, name) or name in known_marks)
return opts
def parse_hookspec_opts(self, module_or_class, name: str):
opts = super().parse_hookspec_opts(module_or_class, name)
if opts is None:
method = getattr(module_or_class, name)
if name.startswith("pytest_"):
# todo: deprecate hookspec hacks
# https://github.com/pytest-dev/pytest/issues/4562
known_marks = {m.name for m in getattr(method, "pytestmark", [])}
opts = {
"firstresult": hasattr(method, "firstresult")
or "firstresult" in known_marks,
"historic": hasattr(method, "historic")
or "historic" in known_marks,
}
return opts
def register(
self, plugin: _PluggyPlugin, name: Optional[str] = None
) -> Optional[str]:
if name in _pytest.deprecated.DEPRECATED_EXTERNAL_PLUGINS:
warnings.warn(
PytestConfigWarning(
"{} plugin has been merged into the core, "
"please remove it from your requirements.".format(
name.replace("_", "-")
)
)
)
return None
ret: Optional[str] = super().register(plugin, name)
if ret:
self.hook.pytest_plugin_registered.call_historic(
kwargs=dict(plugin=plugin, manager=self)
)
if isinstance(plugin, types.ModuleType):
self.consider_module(plugin)
return ret
# ...略
def import_plugin(self, modname: str, consider_entry_points: bool = False) -> None:
"""Import a plugin with ``modname``.
If ``consider_entry_points`` is True, entry point names are also
considered to find a plugin.
"""
# Most often modname refers to builtin modules, e.g. "pytester",
# "terminal" or "capture". Those plugins are registered under their
# basename for historic purposes but must be imported with the
# _pytest prefix.
assert isinstance(modname, str), (
"module name as text required, got %r" % modname
)
if self.is_blocked(modname) or self.get_plugin(modname) is not None:
return
importspec = "_pytest." + modname if modname in builtin_plugins else modname
self.rewrite_hook.mark_rewrite(importspec)
if consider_entry_points:
loaded = self.load_setuptools_entrypoints("pytest11", name=modname)
if loaded:
return
try:
__import__(importspec)
except ImportError as e:
raise ImportError(
'Error importing plugin "{}": {}'.format(modname, str(e.args[0]))
).with_traceback(e.__traceback__) from e
except Skipped as e:
self.skipped_plugins.append((modname, e.msg or ""))
else:
mod = sys.modules[importspec]
self.register(mod, modname)
这里重写了父类的register,如下,重写函数中也调用了父类的register函数
父类的register函数中,调用了self.parse_hookimpl_opts(plugin, name)
,这个函数在当前类即PytestPluginManager
类中重写了,所以,运行时调用的是重写后的PytestPluginManager.parse_hookimpl_opts(plugin, name)
,该函数中,也会调用PluginManager.parse_hookimpl_opts
函数,如果调用该父类函数获取返回值为None,并且函数名称以pytest__
开头,则标记返回结果值为 {}
,这样PluginManager.register
函数中,hookimpl_opts is not None
表达式值为真,会继续往下执行代码,将没有使用hookimpl
标记的,以pytest__
打头的函数添加为对应hook函数的函数实现体。
pluggy/manage.py
PluginManager
类
class PluginManager(object):
# ...略
def register(self, plugin, name=None):
""" Register a plugin and return its canonical name or None if the name
is blocked from registering. Raise a ValueError if the plugin is already
registered. """
plugin_name = name or self.get_canonical_name(plugin)
if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers:
if self._name2plugin.get(plugin_name, -1) is None:
return # blocked plugin, return None to indicate no registration
raise ValueError(
"Plugin already registered: %s=%s
%s"
% (plugin_name, plugin, self._name2plugin)
)
# XXX if an error happens we should make sure no state has been
# changed at point of return
self._name2plugin[plugin_name] = plugin
# register matching hook implementations of the plugin
self._plugin2hookcallers[plugin] = hookcallers = []
for name in dir(plugin):
hookimpl_opts = self.parse_hookimpl_opts(plugin, name)
if hookimpl_opts is not None:
normalize_hookimpl_opts(hookimpl_opts)
method = getattr(plugin, name)
hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts)
hook = getattr(self.hook, name, None)
if hook is None:
hook = _HookCaller(name, self._hookexec)
setattr(self.hook, name, hook)
elif hook.has_spec():
self._verify_hook(hook, hookimpl)
hook._maybe_apply_history(hookimpl)
hook._add_hookimpl(hookimpl)
hookcallers.append(hook)
return plugin_name
_pytest/config/__init__.py
main
函数定义
获取Config
对象config
后,通过调用config.hook.pytest_cmdline_main
,从上到下,执行以下.py
脚本中的pytest_cmdline_main
函数
_pytest/setupplan.py
_pytest/setuponly.py
_pytest/mark/__init__.py
_pytest/cacheprovider.py
_python/python
_python/helpconfig
_python/main.py
_python/main.py
该文件中的 pytest_cmdline_main
函数,负责执行测试
def pytest_cmdline_main(config):
return wrap_session(config, _main)
def wrap_session(
config: Config, doit: Callable[[Config, "Session"], Optional[Union[int, ExitCode]]]
) -> Union[int, ExitCode]:
"""Skeleton command line program"""
session = Session.from_config(config)
session.exitstatus = ExitCode.OK
initstate = 0
try:
try:
config._do_configure()
initstate = 1
config.hook.pytest_sessionstart(session=session)
initstate = 2
session.exitstatus = doit(config, session) or 0
except UsageError:
session.exitstatus = ExitCode.USAGE_ERROR
raise
except Failed:
session.exitstatus = ExitCode.TESTS_FAILED
except (KeyboardInterrupt, exit.Exception):
excinfo = _pytest._code.ExceptionInfo.from_current()
exitstatus = ExitCode.INTERRUPTED # type: Union[int, ExitCode]
if isinstance(excinfo.value, exit.Exception):
if excinfo.value.returncode is not None:
exitstatus = excinfo.value.returncode
if initstate < 2:
sys.stderr.write(
"{}: {}
".format(excinfo.typename, excinfo.value.msg)
)
config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
session.exitstatus = exitstatus
except: # noqa
session.exitstatus = ExitCode.INTERNAL_ERROR
excinfo = _pytest._code.ExceptionInfo.from_current()
try:
config.notify_exception(excinfo, config.option)
except exit.Exception as exc:
if exc.returncode is not None:
session.exitstatus = exc.returncode
sys.stderr.write("{}: {}
".format(type(exc).__name__, exc))
else:
if excinfo.errisinstance(SystemExit):
sys.stderr.write("mainloop: caught unexpected SystemExit!
")
finally:
# Explicitly break reference cycle.
excinfo = None # type: ignore
session.startdir.chdir()
if initstate >= 2:
try:
config.hook.pytest_sessionfinish(
session=session, exitstatus=session.exitstatus
)
except exit.Exception as exc:
if exc.returncode is not None:
session.exitstatus = exc.returncode
sys.stderr.write("{}: {}
".format(type(exc).__name__, exc))
config._ensure_unconfigure()
return session.exitstatus
def _main(config: Config, session: "Session") -> Optional[Union[int, ExitCode]]:
""" default command line protocol for initialization, session,
running tests and reporting. """
config.hook.pytest_collection(session=session)
config.hook.pytest_runtestloop(session=session)
if session.testsfailed:
return ExitCode.TESTS_FAILED
elif session.testscollected == 0:
return ExitCode.NO_TESTS_COLLECTED
return None
def pytest_collection(session):
return session.perform_collect()
def pytest_runtestloop(session):
if session.testsfailed and not session.config.option.continue_on_collection_errors:
raise session.Interrupted(
"%d error%s during collection"
% (session.testsfailed, "s" if session.testsfailed != 1 else "")
)
if session.config.option.collectonly:
return True
for i, item in enumerate(session.items): # session.items 获取值为为pytest测试脚本中的测试函数
nextitem = session.items[i + 1] if i + 1 < len(session.items) else None
item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
if session.shouldfail:
raise session.Failed(session.shouldfail)
if session.shouldstop:
raise session.Interrupted(session.shouldstop)
return True
item.config.hook.pytest_runtest_protocol
执行顺序如下,从上到下执行各个脚本中对应的函数
pytest_runtest_protocol warnings.py
pytest_runtest_protocol assertion/__init__.py
pytest_runtest_protocol faulthandler
pytest_runtest_protocol unittest.py
pytest_runtest_protocol runner.py
_pytest/runner.py
pytest_runtest_protocol 负责执行pytest协议
def pytest_runtest_protocol(item, nextitem):
item.ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
runtestprotocol(item, nextitem=nextitem)
item.ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
return True
def runtestprotocol(item, log=True, nextitem=None):
hasrequest = hasattr(item, "_request")
if hasrequest and not item._request:
item._initrequest()
rep = call_and_report(item, "setup", log)
reports = [rep]
if rep.passed:
if item.config.getoption("setupshow", False):
show_test_item(item)
if not item.config.getoption("setuponly", False):
reports.append(call_and_report(item, "call", log))
reports.append(call_and_report(item, "teardown", log, nextitem=nextitem))
# after all teardown hooks have been called
# want funcargs and request info to go away
if hasrequest:
item._request = False
item.funcargs = None
return reports