django源码分析

本文环境python3.5.2,django1.10.x系列

django源码分析-test命令分析

Django项目中提供了,test命令行命令来执行django的单元测试,该测试用例就是测试自己编写的api测试,用来测试自己编写的代码是否正确,当我们修改旧的代码时,通过运行测试来确保不会影响到旧的逻辑,单元测试是Django项目中很重要的一部分。本文按照mysql案例进行说明,相关的数据库配置如下:

DATABASES = {'default': {'ENGINE': 'django.db.backends.mysql','NAME': 'project','USER': 'test','PASSWORD': 'test','HOST': '127.0.0.1','PORT': '3306',"ATOMIC_REQUESTS": True,"TEST": {"NAME": "test_db","CHARSET": "utf8","COLLATION": "utf8_general_ci"}
}

本文的test用例代码,位于项目的tests.py文件中,示例代码如下;

class ApplicationListAPITest(TestCase):def setUp(self):self.user = self.create_user()def create_user(self, email=None, **kwargs):name = kwargs.get("name") if kwargs.get("name", None) else emailpassword = md5hex(kwargs.get('password', 'password'))is_authorize = kwargs.get('is_authorize', False)kwargs.setdefault('account', email)kwargs.setdefault('name', name)kwargs.setdefault('type', user_type)kwargs.setdefault('is_staff', False)kwargs.setdefault('mobile', '12345678901')user = User(**kwargs)user.set_password(password)user.save()return userdef login_as(self, user, password='password'):return self.client.login(username=user.name, password=password)def test_list(self):self.login_as(self.user)url = '/test'resp = self.client.get(url)ret_data = resp.dataself.assertEqual(resp.status_code, 200)

test的执行概述

在命令行中输入的测试,可能会有好几种情况出现,比如如下几种基本情况;

python manage.py test  # 执行所有项目下的test.py中的测试用例
python manage.py test app_name.tests  # 执行app_name下的tests中的测试用例 效果等同于app_name
python manage.py test app_name.tests. ApplicationListAPITest    # 执行app_name.tests文件中的ApplicationListAPITest测试用例
python manage.py test app_name.tests. ApplicationListAPITest.test_list      # 执行app_name.tests文件中的ApplicationListAPITest的test_list方法

鉴于输入的情况较多和平常使用的具体环境,通过分析其中的任意一个的执行流程就可知其他几种情况的执行过程,故就举例使用python manage.py test 来描述执行test的过程,首先查看test中的Command的handle处理。

def handle(self, *test_labels, **options):from django.conf import settings                                            # 导入配置文件from django.test.utils import get_runner                                    # 获取test的执行类TestRunner = get_runner(settings, options['testrunner'])                    # 导入testrunnerif options['liveserver'] is not None:                                       # 判断是否使用liveserver 是否正真启动一个django serveros.environ['DJANGO_LIVE_TEST_SERVER_ADDRESS'] = options['liveserver']del options['liveserver']test_runner = TestRunner(**options)                                         # 初始化TestRunnerfailures = test_runner.run_tests(test_labels)                               # 运行测试用例if failures:sys.exit(bool(failures))                                                # 如果有失败返回失败信息

首先查看get_runner函数,该函数主要就是导入测试用的testrunner类,

def get_runner(settings, test_runner_class=None):if not test_runner_class:                                           # 如果没有传入test_runner_classtest_runner_class = settings.TEST_RUNNER                        # 使用配置文件中的TEST_RUNNER,在都不配置的情况下默认使用django.test.runner.DiscoverRunnertest_path = test_runner_class.split('.')                    # Allow for Python 2.5 relative pathsif len(test_path) > 1:test_module_name = '.'.join(test_path[:-1])else:test_module_name = '.'test_module = __import__(test_module_name, {}, {}, force_str(test_path[-1]))        # 导入该runner类的moduletest_runner = getattr(test_module, test_path[-1])                                   # 获取该类return test_runner                                                                  # 返回该类

由于在都不配置的情况下,使用django.test.runner.DiscoverRunner类,本文就按照该类来分析,在初始化该实例后,就调用了run_tests方法来执行所有的测试,

def run_tests(self, test_labels, extra_tests=None, **kwargs):"""Run the unit tests for all the test labels in the provided list.Test labels should be dotted Python paths to test modules, testclasses, or test methods.A list of 'extra' tests may also be provided; these testswill be added to the test suite.Returns the number of tests that failed."""self.setup_test_environment()                           # 建立测试环境suite = self.build_suite(test_labels, extra_tests)      # 建立测试的suite,这是将所有的测试进行包装,然后包装成一个suiteold_config = self.setup_databases()                     # 建立测试数据库result = self.run_suite(suite)                          # 执行所有找到的测试用例self.teardown_databases(old_config)                     # 摧毁数据库self.teardown_test_environment()                        # 摧毁测试环境return self.suite_result(suite, result)                 # 包装返回测试结果

首先来查看self.setup_test_environment建立测试环境的过程;

def setup_test_environment(self, **kwargs):setup_test_environment()                # 建立测试环境settings.DEBUG = False                  # 设置运行环境debug为Falseunittest.installHandler()               # 注册handler...def setup_test_environment():"""Perform global pre-test setup, such as installing the instrumented templaterenderer and setting the email backend to the locmem email backend."""Template._original_render = Template._render                                # 配置renderTemplate._render = instrumented_test_render             # Storing previous values in the settings module itself is problematic.# Store them in arbitrary (but related) modules instead. See #20636.mail._original_email_backend = settings.EMAIL_BACKEND                       # 配置邮件settings.EMAIL_BACKEND = 'django.core.mail.backends.locmem.EmailBackend'request._original_allowed_hosts = settings.ALLOWED_HOSTS                    # 配置hostssettings.ALLOWED_HOSTS = ['*']                                              # 测试环境下允许所有host连接mail.outbox = []deactivate()...def installHandler():
global _interrupt_handler
if _interrupt_handler is None:default_handler = signal.getsignal(signal.SIGINT)                           # 获取SIGINT_interrupt_handler = _InterruptHandler(default_handler)                     # 包装default_handlersignal.signal(signal.SIGINT, _interrupt_handler)                            # 注册信号与对应的处理函数

主要就是重新配置了运行的模式,添加了所有的hosts访问,注册了相关handler的处理函数。

当处理完成初始环境后,就来执行比较重要的build_suite的过程了,

self.build_suite(test_labels, extra_tests)

该函数的执行过程如下;

def build_suite(self, test_labels=None, extra_tests=None, **kwargs):suite = self.test_suite()                                                   # unittest.TestSuite的初始化test_labels = test_labels or ['.']                                          # 由于没有传入参数,test_labels为['.']extra_tests = extra_tests or []                                             # extra_tests默认为空       discover_kwargs = {}                                                        # 保存查找的匹配模式和查找目录等信息if self.pattern is not None:                                                # 如果不为空discover_kwargs['pattern'] = self.pattern                               # 使用该pattern 默认的为test*.pyif self.top_level is not None:discover_kwargs['top_level_dir'] = self.top_level                       # 查找的目录for label in test_labels:                                                   # 遍历kwargs = discover_kwargs.copy()                                         # 拷贝一份tests = Nonelabel_as_path = os.path.abspath(label)                                  # 获取label所在的目录的绝对地址# if a module, or "module.ClassName[.method_name]", just run thoseif not os.path.exists(label_as_path):                                   # 如果不存在则代表是使用了类名或者方法作为参数tests = self.test_loader.loadTestsFromName(label)                   # 加载查找suiteelif os.path.isdir(label_as_path) and not self.top_level:               # 如果是目录 并且top_level为空# Try to be a bit smarter than unittest about finding the# default top-level for a given directory path, to avoid# breaking relative imports. (Unittest's default is to set# top-level equal to the path, which means relative imports# will result in "Attempted relative import in non-package.").# We'd be happy to skip this and require dotted module paths# (which don't cause this problem) instead of file paths (which# do), but in the case of a directory in the cwd, which would# be equally valid if considered as a top-level module or as a# directory path, unittest unfortunately prefers the latter.top_level = label_as_path                                           # 获取当前的目录while True:init_py = os.path.join(top_level, '__init__.py')                # 加入__init__.pyif os.path.exists(init_py):                                     # 检查包的__init__.py是否存在try_next = os.path.dirname(top_level)                       # 存在则获取目录下的名称if try_next == top_level:                                   # 如果和上一级目录相同则停止# __init__.py all the way down? give up.breaktop_level = try_next                                        # 否则继续深入查找continuebreakkwargs['top_level_dir'] = top_level                                 # 赋值top_level_dir if not (tests and tests.countTestCases()) and is_discoverable(label):   # 如果没有测试tests 并且是可以查找的目录# Try discovery if path is a package or directorytests = self.test_loader.discover(start_dir=label, **kwargs)        # 查找目录里面的tests# Make unittest forget the top-level dir it calculated from this# run, to support running tests from two different top-levels.self.test_loader._top_level_dir = None                              # 置空suite.addTests(tests)                                                   # 将找到的tests添加到suite中for test in extra_tests:suite.addTest(test)if self.tags or self.exclude_tags:                                          # 是否有tagsuite = filter_tests_by_tags(suite, self.tags, self.exclude_tags)       # 找到有tag的testssuite = reorder_suite(suite, self.reorder_by, self.reverse)                 # 按照reorder_by排序testif self.parallel > 1:parallel_suite = self.parallel_test_suite(suite, self.parallel, self.failfast)# Since tests are distributed across processes on a per-TestCase# basis, there's no need for more processes than TestCases.parallel_units = len(parallel_suite.subsuites)if self.parallel > parallel_units:self.parallel = parallel_units# If there's only one TestCase, parallelization isn't needed.if self.parallel > 1:suite = parallel_suitereturn suite                                                                # 返回数据

先是查找对应的匹配的表达式,然后根据输入参数查找对应的目录,由于此时没有带具体的类名的参数输入,所以会执行到如下代码,去检查目录下所有的文件;

# Try discovery if path is a package or directorytests = self.test_loader.discover(start_dir=label, **kwargs)

其中的test_loader就是unittest.defaultTestLoader,调用该实例的discover方法,

def discover(self, start_dir, pattern='test*.py', top_level_dir=None):"""Find and return all test modules from the specified startdirectory, recursing into subdirectories to find them and return alltests found within them. Only test files that match the pattern willbe loaded. (Using shell style pattern matching.)All test modules must be importable from the top level of the project.If the start directory is not the top level directory then the toplevel directory must be specified separately.If a test package name (directory with '__init__.py') matches thepattern then the package will be checked for a 'load_tests' function. Ifthis exists then it will be called with (loader, tests, pattern) unlessthe package has already had load_tests called from the same discoveryinvocation, in which case the package module object is not scanned fortests - this ensures that when a package uses discover to furtherdiscover child tests that infinite recursion does not happen.If load_tests exists then discovery does *not* recurse into the package,load_tests is responsible for loading all tests in the package.The pattern is deliberately not stored as a loader attribute so thatpackages can continue discovery themselves. top_level_dir is stored soload_tests does not need to pass this argument in to loader.discover().Paths are sorted before being imported to ensure reproducible executionorder even on filesystems with non-alphabetical ordering like ext3/4."""set_implicit_top = Falseif top_level_dir is None and self._top_level_dir is not None:               #由于此时传入的top_level_dir不为空# make top_level_dir optional if called from load_tests in a packagetop_level_dir = self._top_level_direlif top_level_dir is None:set_implicit_top = Truetop_level_dir = start_dirtop_level_dir = os.path.abspath(top_level_dir)                              # 直接获取目录的绝对路径if not top_level_dir in sys.path:                                           # 如果该目录不再系统路径中则加入# all test modules must be importable from the top level directory# should we *unconditionally* put the start directory in first# in sys.path to minimise likelihood of conflicts between installed# modules and development versions?sys.path.insert(0, top_level_dir)self._top_level_dir = top_level_dir                                         # 设置目录名is_not_importable = Falseis_namespace = Falsetests = []if os.path.isdir(os.path.abspath(start_dir)):                               # 检查开始的目录是否为文件夹start_dir = os.path.abspath(start_dir)                                  # 获取开始的目录的绝对地址if start_dir != top_level_dir:                                          # 如果不相同is_not_importable = not os.path.isfile(os.path.join(start_dir, '__init__.py'))  # 判断是否拥有__init__.py文件,即判断是否是模块else:# support for discovery from dotted module namestry:__import__(start_dir)                                               # 直接导入except ImportError:is_not_importable = True                                            # 设置不能导入的标志位else:the_module = sys.modules[start_dir]                                 # 获取该moduletop_part = start_dir.split('.')[0]                                  # 获取顶级目录try:start_dir = os.path.abspath(os.path.dirname((the_module.__file__)))                      # 获取绝对目录except AttributeError:# look for namespace packagestry:spec = the_module.__spec__except AttributeError:spec = Noneif spec and spec.loader is None:if spec.submodule_search_locations is not None:is_namespace = Truefor path in the_module.__path__:if (not set_implicit_top andnot path.startswith(top_level_dir)):continueself._top_level_dir = \(path.split(the_module.__name__.replace(".", os.path.sep))[0])tests.extend(self._find_tests(path,pattern,namespace=True))elif the_module.__name__ in sys.builtin_module_names:# builtin moduleraise TypeError('Can not use builtin modules ''as dotted module names') from Noneelse:raise TypeError('don\'t know how to discover from {!r}'.format(the_module)) from Noneif set_implicit_top:if not is_namespace:self._top_level_dir = \self._get_directory_containing_module(top_part)sys.path.remove(top_level_dir)else:sys.path.remove(top_level_dir)if is_not_importable:                                                           # 判断是否可导入  如果不能导入则报错raise ImportError('Start directory is not importable: %r' % start_dir)if not is_namespace:tests = list(self._find_tests(start_dir, pattern))                          # 查找该目录下的所有testsreturn self.suiteClass(tests)                                                   # 添加tests

先判断是否能够导入,导入后调用self._find_tests方法来查找tests,

def _find_tests(self, start_dir, pattern, namespace=False):"""Used by discovery. Yields test suites it loads."""# Handle the __init__ in this packagename = self._get_name_from_path(start_dir)                                  # 获取开始的 name 此时为 '.'# name is '.' when start_dir == top_level_dir (and top_level_dir is by# definition not a package).if name != '.' and name not in self._loading_packages:                      # 判断name不为 '.' 并且name不再已经导入的packages中# name is in self._loading_packages while we have called into# loadTestsFromModule with name.tests, should_recurse = self._find_test_path(start_dir, pattern, namespace)                                      # 根据dir pattern namespace查找testsif tests is not None:                                                   # 如果tests不为空则返回yield tests                                                         if not should_recurse:                                                  # 如果为False则出了不能导入的错误# Either an error occurred, or load_tests was used by the# package.return# Handle the contents.paths = sorted(os.listdir(start_dir))                                       # 获取当前的目录下的所有文件或文件夹for path in paths:                                                          # 遍历所有子文件或目录full_path = os.path.join(start_dir, path)                               # 获取全路径tests, should_recurse = self._find_test_path(full_path, pattern, namespace)                                      # 查找testsif tests is not None:                                                   # 如果不为空则返回yield testsif should_recurse:                                                      # we found a package that didn't use load_tests.name = self._get_name_from_path(full_path)                          # 获取nameself._loading_packages.add(name)                                    # 添加到导入的packages集合中try:yield from self._find_tests(full_path, pattern, namespace)      # 继续递归遍历finally:self._loading_packages.discard(name)                            # 删除该name

主要是通过了递归调用来查找是否有tests内容,主要是通过self._find_test_path函数来查找tests,

def _find_test_path(self, full_path, pattern, namespace=False):"""Used by discovery.Loads tests from a single file, or a directories' __init__.py whenpassed the directory.Returns a tuple (None_or_tests_from_file, should_recurse)."""basename = os.path.basename(full_path)                                  # 获取文件夹名称if os.path.isfile(full_path):                                           # 检查是否是文件if not VALID_MODULE_NAME.match(basename):                           # 检查是否是合法的名称# valid Python identifiers onlyreturn None, Falseif not self._match_path(basename, full_path, pattern):              # 检查是否配置patternreturn None, False                                              # 没有匹配上返回None False# if the test file matches, load itname = self._get_name_from_path(full_path)                          # 重新获取路径try:module = self._get_module_from_name(name)                       # 导入该moduleexcept case.SkipTest as e:return _make_skipped_test(name, e, self.suiteClass), Falseexcept:error_case, error_message = \_make_failed_import_test(name, self.suiteClass)self.errors.append(error_message)return error_case, Falseelse:mod_file = os.path.abspath(getattr(module, '__file__', full_path))                             realpath = _jython_aware_splitext(os.path.realpath(mod_file))fullpath_noext = _jython_aware_splitext(os.path.realpath(full_path))if realpath.lower() != fullpath_noext.lower():module_dir = os.path.dirname(realpath)mod_name = _jython_aware_splitext(os.path.basename(full_path))expected_dir = os.path.dirname(full_path)msg = ("%r module incorrectly imported from %r. Expected ""%r. Is this module globally installed?")raise ImportError(msg % (mod_name, module_dir, expected_dir))return self.loadTestsFromModule(module, pattern=pattern), False     # 调用从Module中查找testselif os.path.isdir(full_path):                                              # 如果是文件夹if (not namespace andnot os.path.isfile(os.path.join(full_path, '__init__.py'))):return None, False                                                  # 如果没有namespace 并没有__init__.py文件则返回空load_tests = Nonetests = Nonename = self._get_name_from_path(full_path)                              # 重新设置路径名try:package = self._get_module_from_name(name)                          # 获取moduleexcept case.SkipTest as e:return _make_skipped_test(name, e, self.suiteClass), Falseexcept:error_case, error_message = \_make_failed_import_test(name, self.suiteClass)self.errors.append(error_message)return error_case, Falseelse:load_tests = getattr(package, 'load_tests', None)                   # 获取load_tests属性# Mark this package as being in load_tests (possibly ;))    self._loading_packages.add(name)                                    # 添加到loading_packagetry:tests = self.loadTestsFromModule(package, pattern=pattern)      # 从module中导入testsif load_tests is not None:# loadTestsFromModule(package) has loaded tests for us.return tests, Falsereturn tests, True                                              # 返回找到的testsfinally:self._loading_packages.discard(name)                            # 删除nameelse:return None, False

主要是判断了是否是文件或者是否是文件夹,然后导入对应的module然后调用self.loadTestsFromModule去查找对应的tests,

def loadTestsFromModule(self, module, *args, pattern=None, **kws):"""Return a suite of all tests cases contained in the given module"""# This method used to take an undocumented and unofficial# use_load_tests argument.  For backward compatibility, we still# accept the argument (which can also be the first position) but we# ignore it and issue a deprecation warning if it's present.if len(args) > 0 or 'use_load_tests' in kws:                                # 判断传入参数或者是否有warnings.warn('use_load_tests is deprecated and ignored',DeprecationWarning)kws.pop('use_load_tests', None)if len(args) > 1:# Complain about the number of arguments, but don't forget the# required `module` argument.complaint = len(args) + 1raise TypeError('loadTestsFromModule() takes 1 positional argument but {} were given'.format(complaint))if len(kws) != 0:# Since the keyword arguments are unsorted (see PEP 468), just# pick the alphabetically sorted first argument to complain about,# if multiple were given.  At least the error message will be# predictable.complaint = sorted(kws)[0]raise TypeError("loadTestsFromModule() got an unexpected keyword argument '{}'".format(complaint))tests = []for name in dir(module):                                                    # 获取这个模块下所有的内容obj = getattr(module, name)                                             # 判断obj是否是type实例,并且是否是TestCase的子类if isinstance(obj, type) and issubclass(obj, case.TestCase):tests.append(self.loadTestsFromTestCase(obj))                       # 如果是,则调用loadTestsFromTestCase去查找类方法load_tests = getattr(module, 'load_tests', None)tests = self.suiteClass(tests)                                              # TestSuite实例化成testsif load_tests is not None:try:return load_tests(self, tests, pattern)     except Exception as e:error_case, error_message = _make_failed_load_tests(module.__name__, e, self.suiteClass)self.errors.append(error_message)return error_casereturn tests                                                                # 返回

此时就直接调用了self.loadTestsFromTestCase去查找tests,

def loadTestsFromTestCase(self, testCaseClass):"""Return a suite of all tests cases contained in testCaseClass"""if issubclass(testCaseClass, suite.TestSuite):                          # 判断是否是TestSuite的子类raise TypeError("Test cases should not be derived from ""TestSuite. Maybe you meant to derive from ""TestCase?")testCaseNames = self.getTestCaseNames(testCaseClass)                    # 获取所有的test_*的属性并返回成列表if not testCaseNames and hasattr(testCaseClass, 'runTest'):             # 如果没有符合条件的列表,并且有runTest属性testCaseNames = ['runTest']                                         # 重写该列表loaded_suite = self.suiteClass(map(testCaseClass, testCaseNames))       # 实例化成TestSuite实例return loaded_suite                                                     # 返回该实例

其中,此时传入的testCaseClass就是本文例子中的ApplicationListAPITest,此时通过self.getTestCaseNames方法获取符合条件的属性列表,

def getTestCaseNames(self, testCaseClass):"""Return a sorted sequence of method names found within testCaseClass"""def isTestMethod(attrname, testCaseClass=testCaseClass,prefix=self.testMethodPrefix):                             # 此时的testMethodPrefix就是'test'return attrname.startswith(prefix) and \callable(getattr(testCaseClass, attrname))                          # 判断是否以'test'开头并且能够callabletestFnNames = list(filter(isTestMethod, dir(testCaseClass)))                # 获取该类所有的属性,并if self.sortTestMethodsUsing:testFnNames.sort(key=functools.cmp_to_key(self.sortTestMethodsUsing))   # 重写排序return testFnNames                                                          # 返回属性名称列表

此时就self.suiteClass(map(testCaseClass, testCaseNames)),此时的testCaseClass就是继承自TestCase的ApplicationListAPITest,testCaseNames为[‘test_list’],此时就是实例化了一个继承自TestCase的ApplicationListAPITest类的实例,实例化的时候,传入的是’test_list’;

    def __init__(self, methodName='runTest'):"""Create an instance of the class that will use the named testmethod when executed. Raises a ValueError if the instance doesnot have a method with the specified name."""self._testMethodName = methodName  # 传入待测试的方法名称self._outcome = Noneself._testMethodDoc = 'No test'

至此,tests查找完成,将找到的tests添加到TestSuite实例中。

创建测试数据库

完成了所有的tests添加到suite中,此时就会执行数据库创建过程;

old_config = self.setup_databases()

调用setup_databases函数;

def setup_databases(self, **kwargs):return setup_databases(self.verbosity, self.interactive, self.keepdb, self.debug_sql,self.parallel, **kwargs)       # 创建数据库...def get_unique_databases_and_mirrors():"""Figure out which databases actually need to be created.Deduplicate entries in DATABASES that correspond the same database or areconfigured as test mirrors.Return two values:- test_databases: ordered mapping of signatures to (name, list of aliases)where all aliases share the same underlying database.- mirrored_aliases: mapping of mirror aliases to original aliases."""mirrored_aliases = {}test_databases = {}dependencies = {}default_sig = connections[DEFAULT_DB_ALIAS].creation.test_db_signature()   # 获取配置HOST PORT ENGINE 数据库名称for alias in connections:connection = connections[alias]test_settings = connection.settings_dict['TEST']                        # 获取TEST配置信息if test_settings['MIRROR']:                                             # 是否设置MIRROR# If the database is marked as a test mirror, save the alias.mirrored_aliases[alias] = test_settings['MIRROR']else:# Store a tuple with DB parameters that uniquely identify it.# If we have two aliases with the same values for that tuple,# we only need to create the test database once.item = test_databases.setdefault(connection.creation.test_db_signature(),(connection.settings_dict['NAME'], set()))                                                                   # 设置HOST 端口数据库等信息item[1].add(alias)                                                  # if 'DEPENDENCIES' in test_settings:dependencies[alias] = test_settings['DEPENDENCIES']else:if alias != DEFAULT_DB_ALIAS and connection.creation.test_db_signature() != default_sig:dependencies[alias] = test_settings.get('DEPENDENCIES', [DEFAULT_DB_ALIAS])test_databases = dependency_ordered(test_databases.items(), dependencies)test_databases = collections.OrderedDict(test_databases)                    # 排序返回return test_databases, mirrored_aliases...def setup_databases(verbosity, interactive, keepdb=False, debug_sql=False, parallel=0, **kwargs):"""Creates the test databases."""test_databases, mirrored_aliases = get_unique_databases_and_mirrors()       # 获取数据 orderdict(('127.0.0.1','3306','django.db.backends.mysql','test_db'), ('test', {'default'}))old_names = []for signature, (db_name, aliases) in test_databases.items():                # 遍历first_alias = Nonefor alias in aliases:                                                   # 获取连接信息connection = connections[alias]old_names.append((connection, db_name, first_alias is None))# Actually create the database for the first connectionif first_alias is None:                                             # 第一次创建testfirst_alias = aliasconnection.creation.create_test_db(verbosity=verbosity,autoclobber=not interactive,keepdb=keepdb,serialize=connection.settings_dict.get("TEST", {}).get("SERIALIZE", True),)                                                               # 创建test数据库if parallel > 1:for index in range(parallel):connection.creation.clone_test_db(number=index + 1,verbosity=verbosity,keepdb=keepdb,)# Configure all other connections as mirrors of the first oneelse:connections[alias].creation.set_as_test_mirror(connections[first_alias].settings_dict)# Configure the test mirrors.for alias, mirror_alias in mirrored_aliases.items():connections[alias].creation.set_as_test_mirror(connections[mirror_alias].settings_dict)if debug_sql:                                                               # 是否调试sqlfor alias in connections:connections[alias].force_debug_cursor = True                        # 调试return old_names                                                            # 返回

其中,调用了创建方法;

    connection.creation.create_test_db(verbosity=verbosity,autoclobber=not interactive,keepdb=keepdb,serialize=connection.settings_dict.get("TEST", {}).get("SERIALIZE", True),)

主要是创建数据库,创建测试数据库相关信息;

def create_test_db(self, verbosity=1, autoclobber=False, serialize=True, keepdb=False):"""Creates a test database, prompting the user for confirmation if thedatabase already exists. Returns the name of the test database created."""# Don't import django.core.management if it isn't needed.from django.core.management import call_commandtest_database_name = self._get_test_db_name()                               # 获取测试数据库名称if verbosity >= 1:action = 'Creating'if keepdb:                                                              # 如果使用已存在的数据库action = "Using existing"print("%s test database for alias %s..." % (action,self._get_database_display_str(verbosity, test_database_name),))# We could skip this call if keepdb is True, but we instead# give it the keepdb param. This is to handle the case# where the test DB doesn't exist, in which case we need to# create it, then just not destroy it. If we instead skip# this, we will get an exception.self._create_test_db(verbosity, autoclobber, keepdb)                        # 新建数据库self.connection.close()                                                     # 关闭连接settings.DATABASES[self.connection.alias]["NAME"] = test_database_nameself.connection.settings_dict["NAME"] = test_database_name                  # 设置数据库名称# We report migrate messages at one level lower than that requested.# This ensures we don't get flooded with messages during testing# (unless you really ask to be flooded).call_command('migrate',verbosity=max(verbosity - 1, 0),interactive=False,database=self.connection.alias,run_syncdb=True,)                                                                           # 通过命令行命令调用Migrate生成数据库 # We then serialize the current state of the database into a string# and store it on the connection. This slightly horrific process is so people# who are testing on databases without transactions or who are using# a TransactionTestCase still get a clean database on every test run.if serialize:self.connection._test_serialized_contents = self.serialize_db_to_string()call_command('createcachetable', database=self.connection.alias)            # 创建缓存表# Ensure a connection for the side effect of initializing the test database.self.connection.ensure_connection()                                         # 确保新连接return test_database_name                                                   # 返回数据名
执行tests
result = self.run_suite(suite)

该函数主要就是运行找到的tests,执行流程如下;

def run_suite(self, suite, **kwargs):resultclass = self.get_resultclass()        # 设置resultclassreturn self.test_runner(verbosity=self.verbosity,failfast=self.failfast,resultclass=resultclass,).run(suite)                                # 运行找到的tests

主要是调用了unittest.TextTestRunner的run方法,将找到的sutie作为参数传入;

def run(self, test):"Run the given test case or test suite."result = self._makeResult()                                     # 初始化resultregisterResult(result)  result.failfast = self.failfastresult.buffer = self.bufferresult.tb_locals = self.tb_localswith warnings.catch_warnings():                                 # 捕获相关警告信息if self.warnings:# if self.warnings is set, use it to filter all the warningswarnings.simplefilter(self.warnings)# if the filter is 'default' or 'always', special-case the# warnings from the deprecated unittest methods to show them# no more than once per module, because they can be fairly# noisy.  The -Wd and -Wa flags can be used to bypass this# only when self.warnings is None.if self.warnings in ['default', 'always']:warnings.filterwarnings('module',category=DeprecationWarning,message='Please use assert\w+ instead.')startTime = time.time()                                     # 获取当前时间startTestRun = getattr(result, 'startTestRun', None)        # 获取是否有startTestRun属性if startTestRun is not None:startTestRun()                                          # 有就运行try:test(result)                                            # 调用test的__call__方法将result传入finally:stopTestRun = getattr(result, 'stopTestRun', None)      # 获取是否有stopTestRun方法if stopTestRun is not None:stopTestRun()stopTime = time.time()                                      # 获取结束时间timeTaken = stopTime - startTime                                # 获取运行的总时间result.printErrors()                                            # 处理相关运行的信息状态...                 # 相关运行结果的显示处理

此时就是调用了传入的suite的call方法,由于suite类继承自BaseTestSuite方法就是调用了该类的call方法,

def __call__(self, *args, **kwds):return self.run(*args, **kwds)

就是调用了run方法,此时的suite类是unittest.TestSuite,该类的run方法;

def run(self, result, debug=False):topLevel = Falseif getattr(result, '_testRunEntered', False) is False:result._testRunEntered = topLevel = Truefor index, test in enumerate(self):                             # 访问self.__iter__方法if result.shouldStop:breakif _isnotsuite(test):self._tearDownPreviousClass(test, result)self._handleModuleFixture(test, result)self._handleClassSetUp(test, result)result._previousTestClass = test.__class__if (getattr(test.__class__, '_classSetupFailed', False) orgetattr(result, '_moduleSetUpFailed', False)):continueif not debug:                                               # 如果debug为False则直接调用test的call方法test(result)else:test.debug()if self._cleanup:self._removeTestAtIndex(index)if topLevel:self._tearDownPreviousClass(None, result)self._handleModuleTearDown(result)result._testRunEntered = Falsereturn result

此时的self.iter方法对应如下;

def __iter__(self):return iter(self._tests)

所以,test(result)执行的就是test的call方法,此时的test都是TestCase类的实例;此时调用了基类的SimpleTestCase的call方法;

def __call__(self, result=None):"""Wrapper around default __call__ method to perform common Django testset up. This means that user-defined Test Cases aren't required toinclude a call to super().setUp()."""testMethod = getattr(self, self._testMethodName)skipped = (getattr(self.__class__, "__unittest_skip__", False) orgetattr(testMethod, "__unittest_skip__", False))if not skipped:try:self._pre_setup()                                   # 调用前期准备方法except Exception:result.addError(self, sys.exc_info())returnsuper(SimpleTestCase, self).__call__(result)                # 调用父类__call__方法if not skipped:try:self._post_teardown()                               # 如果不跳过则调用该接受清理数据except Exception:result.addError(self, sys.exc_info())return

在调用了self._post_teardown()方法,其中就有在执行完成后清理数据库的方法,

def _fixture_teardown(self):# Allow TRUNCATE ... CASCADE and don't emit the post_migrate signal# when flushing only a subset of the appsfor db_name in self._databases_names(include_mirrors=False):# Flush the databaseinhibit_post_migrate = (self.available_apps is not None or(   # Inhibit the post_migrate signal when using serialized# rollback to avoid trying to recreate the serialized data.self.serialized_rollback andhasattr(connections[db_name], '_test_serialized_contents')))call_command('flush', verbosity=0, interactive=False,database=db_name, reset_sequences=False,allow_cascade=self.available_apps is not None,inhibit_post_migrate=inhibit_post_migrate)         # 调用flush命令清理数据库数据

这就是在test执行完成后,test数据库还存在的话里面的数据都为空的原因,继续查看基类的TestCase基类的call方法,

def __call__(self, *args, **kwds):return self.run(*args, **kwds)

此时调用了TestSuite的run方法;

def run(self, result=None):orig_result = resultif result is None:                                                      # 如果传入的result为空result = self.defaultTestResult()                                   # 实例化一个默认的result.TestResult()实例startTestRun = getattr(result, 'startTestRun', None)                # 获取该实例的startTestRun属性if startTestRun is not None:                                        # 如果不为空则执行startTestRun()result.startTest(self)testMethod = getattr(self, self._testMethodName)                        # 获取自己的测试方法,test_listif (getattr(self.__class__, "__unittest_skip__", False) or              # 如果测试类上有跳过getattr(testMethod, "__unittest_skip__", False)):                   # 如果测试方法上有则跳过# If the class or method was skipped.try:skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')or getattr(testMethod, '__unittest_skip_why__', ''))self._addSkip(result, self, skip_why)finally:result.stopTest(self)                                           # 停止并返回returnexpecting_failure_method = getattr(testMethod,"__unittest_expecting_failure__", False)     # 获取失败方法expecting_failure_class = getattr(self,"__unittest_expecting_failure__", False)      # 获取失败类expecting_failure = expecting_failure_class or expecting_failure_methodoutcome = _Outcome(result)try:self._outcome = outcomewith outcome.testPartExecutor(self):self.setUp()                                                            # 调用setup方法if outcome.success:                                                         # 如果成功outcome.expecting_failure = expecting_failurewith outcome.testPartExecutor(self, isTest=True):testMethod()                                                        # 调用测试的方法并执行outcome.expecting_failure = Falsewith outcome.testPartExecutor(self):self.tearDown()                                                     # 最后调用tearDown方法self.doCleanups()                                                           # 清理for test, reason in outcome.skipped:self._addSkip(result, test, reason)self._feedErrorsToResult(result, outcome.errors)if outcome.success:if expecting_failure:if outcome.expectedFailure:self._addExpectedFailure(result, outcome.expectedFailure)else:self._addUnexpectedSuccess(result)else:result.addSuccess(self)                                             return result                                                               # 返回结果finally:result.stopTest(self)                                                       # 执行完成后进行清理工作if orig_result is None:stopTestRun = getattr(result, 'stopTestRun', None)if stopTestRun is not None:stopTestRun()# explicitly break reference cycles:# outcome.errors -> frame -> outcome -> outcome.errors# outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailureoutcome.errors.clear()outcome.expectedFailure = None# clear the outcome, no more neededself._outcome = None

至此,一个test_list测试方法就算执行完成,其中的其他细节大家可以自行阅读。

测试执行完成清理数据库和环境
    self.teardown_databases(old_config)self.teardown_test_environment()

其中主要就是根据参数值判断是否需要删除创建的测试数据库;

def teardown_databases(self, old_config, **kwargs):"""Destroys all the non-mirror databases."""for connection, old_name, destroy in old_config:if destroy:if self.parallel > 1:for index in range(self.parallel):connection.creation.destroy_test_db(number=index + 1,verbosity=self.verbosity,keepdb=self.keepdb,)connection.creation.destroy_test_db(old_name, self.verbosity, self.keepdb)  # 删除数据库...def teardown_test_environment(self, **kwargs):unittest.removeHandler()                # 移除信号的handlerteardown_test_environment()             # 撤销环境信息

至此整个测试就执行完成,所有的条件都恢复成原来的值。

总结

作为Django项目中比较重要的test的过程,大致如上所述,先建立测试环境的数据,然后找到项目中的所有的测试用例,然后建立测试数据库并执行测试用例,然后测试用例运行完成后清理数据库数据,最后恢复删除掉配置的测试环境。大致流程如上所述,详细的细节大家可自行阅读,本文难免有疏漏,请批评指正。

Django源码分析8:单元测试test命令浅析相关推荐

  1. Django源码分析10:makemigrations命令概述

    django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-makemigrations命令概述 Django项目中的数据库管理命令就是通过makemig ...

  2. Django源码分析7:migrate命令的浅析

    django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-migrate命令分析 Django项目中提供了,通过migrations操作数据库的结构的命 ...

  3. Django源码分析4:staticfiles静态文件处理中间件分析

    django源码分析 本文环境python3.5.2,django1.10.x系列1.在上一篇文章中已经分析过handler的处理过程,其中load_middleware就是将配置的中间件进行初始化, ...

  4. Django源码分析2:本地运行runserver分析

    django源码分析 本文环境python3.5.2,django1.10.x系列1.根据上一篇文章分析了,django-admin startproject与startapp的分析流程后,根据dja ...

  5. Django源码分析9:model.py表结构的初始化概述

    django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-model概述 Django项目中提供了内置的orm框架,只需要在models.py文件中添加 ...

  6. Django源码分析6:auth认证及登陆保持

    django源码分析 本文环境python3.5.2,django1.10.x系列 1.这次分析django框架中登陆认证与接口权限检查. 2.在后端开发中,难免会对接口进行权限验证,其中对于接口是否 ...

  7. Django源码分析5:session会话中间件分析

    django源码分析 本文环境python3.5.2,django1.10.x系列 1.这次分析django框架中的会话中间件. 2.会话保持是目前框架都支持的一个功能,因为http是无状态协议,无法 ...

  8. Django源码分析3:处理请求wsgi分析与视图View

    django源码分析 本文环境python3.5.2,django1.10.x系列 根据前上一篇runserver的博文,已经分析了本地调试服务器的大致流程,现在我们来分析一下当runserver运行 ...

  9. Django源码分析1:创建项目和应用分析

    django命令行源码分析 本文环境python3.5.2,django1.10.x系列 当命令行输入时django-admin时 (venv) ACA80166:dbManger wuzi$ dja ...

最新文章

  1. 数据库初识--从MySQL 出发
  2. Linux之DNS服务器搭建及常见DNS***和防御
  3. 社交网站将推动手游发展
  4. centos7安装DHCP服务器
  5. log4j slf4j实现_日志那点事儿——slf4j源码剖析
  6. C++socket编程(四):4.2 创建XTcp动态链接库项目
  7. 组策略不让你登陆你怎么办
  8. python爬虫案例——百度贴吧数据采集
  9. excel制作跨职能流程图_你会用Excel制作流程图吗?超级强大的功能
  10. 空气阻力对乒乓球运动轨迹的影响
  11. camtasia 2022标准版录制电脑屏幕视频教程
  12. 清华操作系统实验ucore_lab2
  13. 20211014gfsj_re_refor50plz50
  14. 淘宝客导购产品设计(二)
  15. 应用程序未安装:已安装了存在签名冲突的同名数据包。
  16. 【Docker 那些事儿】容器监控系统,来自Docker的暴击
  17. Python爬虫之猫眼APP电影数据(十八)
  18. 中职计算机组装与维修专业,教育部中等职业计算机示范专业规划教材:计算机组装与维修...
  19. KVM内核文档阅读笔记
  20. 算法工程师的正常研发流程

热门文章

  1. 用 Python 写 3D 游戏,太赞了
  2. 英特尔王锐:软硬件并驾齐驱,开发者是真英雄
  3. 又一起“删库”:链家程序员怒删公司 9TB 数据,被判 7 年
  4. 想成为一个数据科学家却不知道从何下手?这份路线图带你打开数据科学大门!...
  5. 口罩检测识别率惊人,这个Python项目开源了
  6. 贾扬清感谢信:阿里开源10年,致敬千万开源人
  7. 通俗易懂:8大步骤图解注意力机制
  8. Google148亿元收购Fitbit,抢占苹果、三星可穿戴设备市场地盘
  9. 打造 AI Beings,和微信合作…第七代微软小冰的成长之路
  10. AI一分钟 | 搜狗王小川:今年重点战略是输入法升级和发展机器翻译;北京无人驾驶试验场下半年正式运营