zoukankan      html  css  js  c++  java
  • 使用twisted框架编写的测试工具,如何与pytest框架结合

    一. 自动化需求

    1. 设备A云平台是通过自定义协议(JT808协议)通信的

    2. 基于twisted框架模拟设备A,模拟脚本可替代设备A与云平台进行通信

    3. 使用pytest+allure测试方案, 并采用数据驱动方式, 快速验证云端的端云交互逻辑的正确性

    二. 遇到问题

    1. 启动设备模拟工具后, twisted.run()之后的代码不会被执行(看到的现象是主线程会一直处于卡住的状态)

    三. 解决方案

    思路: 以线程的方式运行twisted.run(), 此时需要注意installSignalHandlers参数的使用

    3.1 模拟工具脚本文件

    # -*- coding: utf-8 -*-
    # @Time    : 2021/3/4 20:03
    # @Author  : chinablue
    # @File    : device_simulate.py
    
    from twisted.internet.protocol import ClientFactory, Protocol
    
    
    class DeviceProto(Protocol):
    
        def dataReceived(self, data):
            """缓存消息, 转义消息, 提取消息, 分发消息"""
            print(data)
    
        def dataSend(self, msg_id, data):
            """向云端发送消息"""
            self.transport.write(data)
    
    
    class DeviceFactory(ClientFactory):
    
        def __init__(self):
            self.server = DeviceProto()
    
        def buildProtocol(self, addr):
            return self.server

    3.2 pytest中的conftest.py文件

    # -*- coding: utf-8 -*-
    # @Time    : 2021/3/1 14:51
    # @Author  : chinablue
    # @File    : conftest.py
    
    import threading
    
    import pytest
    import allure
    from twisted.internet import reactor as twisted_reactor
    
    from device_simulate import DeviceFactory
    
    
    @pytest.fixture(scope="session", autouse=True)
    def df():
        with allure.step(f"启动服务"):
            df = DeviceFactory()
    
            def task_start_server(df):
                ip, port = "云端ip", int("云端端口")
                twisted_reactor.connectTCP(ip, port, df)
                twisted_reactor.run(installSignalHandlers=False)
    
            t = threading.Thread(
                target=task_start_server,
                args=(df,),
                daemon=True
            )
            t.start()
    
        yield df
    
        with allure.step(f"停止服务"):
            twisted_reactor.callFromThread(twisted_reactor.stop)
            t.join()

    注意事项:

      1) 在子线程中运行twisted.run(), 需要参数: installSignalHandlers=False(注意上方代码标红处)

      2) 测试夹具df返回的df对象,可以调用到DeviceProto类中的dataSend()方法. 

               在用例文件中可以通过df.server.dataSend()方式来控制用例在何时向云端发出怎样的数据(注意下方代码标红处)

    3.3 用例文件(由于真实使用场景较为复杂,这里只给出部分代码)

    # -*- coding: utf-8 -*-
    # @Time    : 2021/3/1 11:36
    # @Author  : chinablue
    # @File    : test_demo.py
    
    import allure
    import pytest
    
    from case.base_case import BaseCase
    
    test_data_list = [
        {
            "case_desc": "触发告警: 紧急报警",
            "alert_flag": "00000001",
            "expect_alarmValue": "紧急报警"
        },
        {
            "case_desc": "触发告警: 超速报警",
            "alert_flag": "00000002",
            "expect_alarmValue": "超速报警"
        },
        {
            "case_desc": "触发告警: 紧急报警,超速报警",
            "alert_flag": "00000003",
            "expect_alarmValue": "紧急报警,超速报警"
        },
    ]
    
    test_params_str, test_data_list = BaseCase.handle_testdata(test_data_list)
    ids = [test_data[0] for test_data in test_data_list]
    
    
    @pytest.mark.parametrize(test_params_str, test_data_list, ids=ids)
    @allure.feature(f"实时上报接口")
    @allure.story(f"部标告警")
    class TestRealtimeAlert(BaseCase):
    
        def test1(self, df, init_location_info, case_desc, alert_flag, expect_alarmValue):
            with allure.step(f"设备操作: 位置信息上报(有告警)"):
                alert_flag = alert_flag
                status = None
                speed = None
                df.server.dataSend(
                    self.proto_msgid.location_info_upload,
                    data=self.terminal_proto.gen_message_location_info_upload(
                        alert_flag=alert_flag,
                        status=status,
                        speed=speed
                    )
                )
    
            with allure.step(f"云端验证: 查询[实时位置信息]接口"):
                self.web_validate.check_realtime_alarmValue(
                    devId=self.config.dev_id,
                    alarmValue=expect_alarmValue
                )

    四. 效果展示

  • 相关阅读:
    P4014 分配问题 网络流
    P4015 运输问题 网络流问题
    P4013 数字梯形问题 网络流
    网络流 P2770 航空路线问题
    网络流之最小费用最大流 P1251 餐巾计划问题
    二分图定理
    数论 C
    网络流 E
    网络流 之 P2766 最长不下降子序列问题
    scp使用
  • 原文地址:https://www.cnblogs.com/reconova-56/p/14482842.html
Copyright © 2011-2022 走看看