zoukankan      html  css  js  c++  java
  • 执行sql后,出现Assertion failed

    code:

    import pytest
    import pymysql
    # 操作数据库
    
    conn = pymysql.connect(
        user='root',
        passwd='123456',
        host='localhost',
        port=3306,
        db='my_test'
    )
    def test_db():
        qs = 'select uid,uname,pwd from test_user'
        lst = []
        try:
            cursor = conn.cursor()
            cursor.execute(qs)
            # 获取所有的数据
            r = cursor.fetchall()
            for x in r:
                # x 表示列
                u = (x[0],x[1],x[2])
                lst.append(u)
            return lst
        finally:
            cursor.close()
            conn.close()
    
    @pytest.mark.parametrize('uid,uname,pwd',test_db())
    def test04(uid,uname,pwd):
        print(uid,uname,pwd)
    
    if __name__ == '__main__':
        pytest.main(['-vs', 'test_file.py'])
    

    控制台

    Testing started at 11:10 ...
    D:TestSoftwareAnaconda3python.exe "D:TestSoftwarePyCharmPyCharm Community Edition 2020.1.3pluginspython-cehelperspycharm\_jb_pytest_runner.py" --path D:/TestSoftware/PyCharm/PyCharmProject/Jpress/Cases/ddt/test_file.py
    Launching pytest with arguments D:/TestSoftware/PyCharm/PyCharmProject/Jpress/Cases/ddt/test_file.py in D:TestSoftwarePyCharmPyCharmProjectJpressCasesddt
    
    ============================= test session starts =============================
    platform win32 -- Python 3.7.4, pytest-5.2.1, py-1.8.0, pluggy-0.13.0 -- D:TestSoftwareAnaconda3python.exe
    cachedir: .pytest_cache
    rootdir: D:TestSoftwarePyCharmPyCharmProjectJpressCasesddt
    plugins: allure-pytest-2.8.18, arraydiff-0.3, dependency-0.5.1, doctestplus-0.4.0, openfiles-0.4.0, remotedata-0.3.2
    collecting ... collected 4 items
    
    test_file.py::test_db FAILED                                             [ 25%]
    test_file.py:60 (test_db)
    def test_db():
            qs = 'select uid,uname,pwd from test_user'
            lst = []
            try:
                cursor = conn.cursor()
    >           cursor.execute(qs)
    
    test_file.py:66: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.cursors.Cursor object at 0x000001FC4322A5C8>
    query = 'select uid,uname,pwd from test_user', args = None
    
        def execute(self, query, args=None):
            """Execute a query
        
            :param str query: Query to execute.
        
            :param args: parameters used with query. (optional)
            :type args: tuple, list or dict
        
            :return: Number of affected rows
            :rtype: int
        
            If args is a list or tuple, %s can be used as a placeholder in the query.
            If args is a dict, %(name)s can be used as a placeholder in the query.
            """
            while self.nextset():
                pass
        
            query = self.mogrify(query, args)
        
    >       result = self._query(query)
    
    ..........Anaconda3libsite-packagespymysqlcursors.py:163: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.cursors.Cursor object at 0x000001FC4322A5C8>
    q = 'select uid,uname,pwd from test_user'
    
        def _query(self, q):
            conn = self._get_db()
            self._last_executed = q
            self._clear_result()
    >       conn.query(q)
    
    ..........Anaconda3libsite-packagespymysqlcursors.py:321: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.connections.Connection object at 0x000001FC4308F048>
    sql = b'select uid,uname,pwd from test_user', unbuffered = False
    
        def query(self, sql, unbuffered=False):
            # if DEBUG:
            #     print("DEBUG: sending query:", sql)
            if isinstance(sql, text_type) and not (JYTHON or IRONPYTHON):
                if PY2:
                    sql = sql.encode(self.encoding)
                else:
                    sql = sql.encode(self.encoding, 'surrogateescape')
    >       self._execute_command(COMMAND.COM_QUERY, sql)
    
    ..........Anaconda3libsite-packagespymysqlconnections.py:504: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.connections.Connection object at 0x000001FC4308F048>
    command = 3, sql = b'select uid,uname,pwd from test_user'
    
        def _execute_command(self, command, sql):
            """
            :raise InterfaceError: If the connection is closed.
            :raise ValueError: If no username was specified.
            """
            if not self._sock:
    >           raise err.InterfaceError(0, '')
    E           pymysql.err.InterfaceError: (0, '')
    
    ..........Anaconda3libsite-packagespymysqlconnections.py:742: InterfaceError
    
    During handling of the above exception, another exception occurred:
    
        def test_db():
            qs = 'select uid,uname,pwd from test_user'
            lst = []
            try:
                cursor = conn.cursor()
                cursor.execute(qs)
                # 获取所有的数据
                r = cursor.fetchall()
                for x in r:
                    # x 表示列
                    u = (x[0],x[1],x[2])
                    lst.append(u)
                return lst
            finally:
                cursor.close()
    >           conn.close()
    
    test_file.py:76: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.connections.Connection object at 0x000001FC4308F048>
    
        def close(self):
            """
            Send the quit message and close the socket.
        
            See `Connection.close() <https://www.python.org/dev/peps/pep-0249/#Connection.close>`_
            in the specification.
        
            :raise Error: If the connection is already closed.
            """
            if self._closed:
    >           raise err.Error("Already closed")
    E           pymysql.err.Error: Already closed
    
    ..........Anaconda3libsite-packagespymysqlconnections.py:356: Error
    
    test_file.py::test04[1-tom-123] 
    test_file.py::test04[2-kite-123] 
    test_file.py::test04[3-rows-456] 
    
    ================================== FAILURES ===================================
    ___________________________________ test_db ___________________________________
    
        def test_db():
            qs = 'select uid,uname,pwd from test_user'
            lst = []
            try:
                cursor = conn.cursor()
    >           cursor.execute(qs)
    
    test_file.py:66: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.cursors.Cursor object at 0x000001FC4322A5C8>
    query = 'select uid,uname,pwd from test_user', args = None
    
        def execute(self, query, args=None):
            """Execute a query
        
            :param str query: Query to execute.
        
            :param args: parameters used with query. (optional)
            :type args: tuple, list or dict
        
            :return: Number of affected rows
            :rtype: int
        
            If args is a list or tuple, %s can be used as a placeholder in the query.
            If args is a dict, %(name)s can be used as a placeholder in the query.
            """
            while self.nextset():
                pass
        
            query = self.mogrify(query, args)
        
    >       result = self._query(query)
    
    ..........Anaconda3libsite-packagespymysqlcursors.py:163: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.cursors.Cursor object at 0x000001FC4322A5C8>
    q = 'select uid,uname,pwd from test_user'
    
        def _query(self, q):
            conn = self._get_db()
            self._last_executed = q
            self._clear_result()
    >       conn.query(q)
    
    ..........Anaconda3libsite-packagespymysqlcursors.py:321: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.connections.Connection object at 0x000001FC4308F048>
    sql = b'select uid,uname,pwd from test_user', unbuffered = False
    
        def query(self, sql, unbuffered=False):
            # if DEBUG:
            #     print("DEBUG: sending query:", sql)
            if isinstance(sql, text_type) and not (JYTHON or IRONPYTHON):
                if PY2:
                    sql = sql.encode(self.encoding)
                else:
                    sql = sql.encode(self.encoding, 'surrogateescape')
    >       self._execute_command(COMMAND.COM_QUERY, sql)
    
    ..........Anaconda3libsite-packagespymysqlconnections.py:504: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.connections.Connection object at 0x000001FC4308F048>
    command = 3, sql = b'select uid,uname,pwd from test_user'
    
        def _execute_command(self, command, sql):
            """
            :raise InterfaceError: If the connection is closed.
            :raise ValueError: If no username was specified.
            """
            if not self._sock:
    >           raise err.InterfaceError(0, '')
    E           pymysql.err.InterfaceError: (0, '')
    
    ..........Anaconda3libsite-packagespymysqlconnections.py:742: InterfaceError
    
    During handling of the above exception, another exception occurred:
    
        def test_db():
            qs = 'select uid,uname,pwd from test_user'
            lst = []
            try:
                cursor = conn.cursor()
                cursor.execute(qs)
                # 获取所有的数据
                r = cursor.fetchall()
                for x in r:
                    # x 表示列
                    u = (x[0],x[1],x[2])
                    lst.append(u)
                return lst
            finally:
                cursor.close()
    >           conn.close()
    
    test_file.py:76: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    
    self = <pymysql.connections.Connection object at 0x000001FC4308F048>
    
        def close(self):
            """
            Send the quit message and close the socket.
        
            See `Connection.close() <https://www.python.org/dev/peps/pep-0249/#Connection.close>`_
            in the specification.
        
            :raise Error: If the connection is already closed.
            """
            if self._closed:
    >           raise err.Error("Already closed")
    E           pymysql.err.Error: Already closed
    
    ..........Anaconda3libsite-packagespymysqlconnections.py:356: Error
    ========================= 1 failed, 3 passed in 0.30s =========================
    
    Process finished with exit code 1
    PASSED                                   [ 50%]1 tom 123
    PASSED                                  [ 75%]2 kite 123
    PASSED                                  [100%]3 rows 456
    
    Assertion failed
    
    Assertion failed
    
    
  • 相关阅读:
    SQL Server误区30日谈Day15CheckPoint只会将已提交的事务写入磁盘
    SQL Server误区30日谈Day16数据的损坏和修复
    【译】Windows Azure迁移生命周期概览
    【译】实现为Windows Azure制定的迁移计划
    SQL Server误区30日谈Day18有关FileStream的存储,垃圾回收以及其它
    【译】为迁移到Windows Azure制定规划
    SQL Server误区30日谈Day14清除日志后会将相关的LSN填零初始化
    SQL PASS北京用户群成功举办第一次线下活动,性能调优PPT分享
    Windows Azure虚拟机概览
    SQL Pass北京将举办第一次线下活动,欢迎大家报名
  • 原文地址:https://www.cnblogs.com/c-jw/p/13998777.html
Copyright © 2011-2022 走看看