zoukankan      html  css  js  c++  java
  • python【串口操作】【自定义模块】

    1. serial和deque模块
    import sys
    import time
    import re
    from collections import deque
    
    try:
        import serial.tools.list_ports
    
        PYSERIAL_INSTALLED = True
    except ImportError:
        PYSERIAL_INSTALLED = False
    
    
    MAX_BUFFER = 65535
    TELNET_RETURN = "
    "
    RETURN = TELNET_RETURN
    BACKSPACE_CHAR = "x08"
    
    
    # Default values
    serial_settings = {
        "port": "COM4",
        "baudrate": 115200,
        "bytesize": serial.EIGHTBITS,
        "parity": serial.PARITY_NONE,
        "stopbits": serial.STOPBITS_ONE,
    }
    
    
    def upate_serial_port(serial_settings):
        comm_port = serial_settings.pop("port")
        comm_port = check_serial_port(comm_port)
        serial_settings.update({"port": comm_port})
    
    
    def check_serial_port(name):
        """returns valid COM Port."""
        if not PYSERIAL_INSTALLED:
            msg = (
                "
    pyserial is not installed. Please PIP install pyserial:
    
    "
                "pip install pyserial
    
    "
            )
            raise ValueError(msg)
    
        try:
            cdc = next(serial.tools.list_ports.grep(name))
            return cdc[0]
        except StopIteration:
            msg = f"device {name} not found. "
            msg += "available devices are: "
            ports = list(serial.tools.list_ports.comports())
            for p in ports:
                msg += f"{str(p)},"
            raise ValueError(msg)
    
    
    def _write_channel(remote_conn, out_data, encoding="utf-8"):
        remote_conn.write(write_bytes(out_data, encoding=encoding))
        remote_conn.flush()
    
    
    def write_bytes(out_data, encoding="ascii"):
        """Legacy for Python2 and Python3 compatible byte stream."""
        if sys.version_info[0] >= 3:
            if isinstance(out_data, type("")):
                if encoding == "utf-8":
                    return out_data.encode("utf-8")
                else:
                    return out_data.encode("ascii", "ignore")
            elif isinstance(out_data, type(b"")):
                return out_data
        msg = "Invalid value for out_data neither unicode nor byte string: {}".format(
            out_data
        )
        raise ValueError(msg)
    
    
    def _read_channel(remote_conn):
        output = ""
        while remote_conn.inWaiting() > 0:
            output += remote_conn.read(remote_conn.inWaiting()).decode(
                "utf-8", "ignore"
            )
        return output
    
    
    def establish_connection(serial_settings):
        remote_conn = serial.Serial(**serial_settings)
        remote_conn, login_msg = serial_login(remote_conn,'root','password')
        print('login_msg:', login_msg)
        return remote_conn
    
    
    def open_connection(serial_settings):
        _modify_connection_params()
        return establish_connection(serial_settings)
    
    
    def _modify_connection_params():
        """Modify connection parameters prior connection."""
        pass
    
    
    def cleanup():
        """Logout of the session additional cleanup."""
        pass
    
    
    def serial_login(
        remote_conn,
        username,
        password,
        pri_prompt_terminator=r"~#", # device type1
        alt_prompt_terminator=r"$", # device type2
        username_pattern=r"(?:[Uu]ser:|sername|ogin)",
        pwd_pattern=r"assword",
        max_loops=30,
    ):
        return (
            telnet_login(
                remote_conn,
                username,
                password,
                pri_prompt_terminator,
                alt_prompt_terminator,
                username_pattern,
                pwd_pattern,
                max_loops,
        ))
    
    
    def telnet_login(
        remote_conn,
        username,
        password,
        pri_prompt_terminator=r"#s*$",
        alt_prompt_terminator=r">s*$",
        username_pattern=r"(?:user:|username|login|user name)",
        pwd_pattern=r"assword",
        max_loops=20,
    ):
    
        delay_factor = 1
        time.sleep(1 * delay_factor)
    
        output = ""
        return_msg = ""
        i = 1
        while i <= max_loops:
            try:
                output = read_channel(remote_conn)
                return_msg += output
    
                print('output: ',output)
    
                # Search for username pattern / send username
                if re.search(username_pattern, output, flags=re.I):
                    # Sometimes username/password must be terminated with "
    " and not "
    "
                    write_channel(remote_conn,username + "
    ")
                    time.sleep(1 * delay_factor)
                    output = read_channel(remote_conn)
                    return_msg += output
    
                # Search for password pattern / send password
                if re.search(pwd_pattern, output, flags=re.I):
                    # Sometimes username/password must be terminated with "
    " and not "
    "
                    write_channel(remote_conn,password + "
    ")
                    time.sleep(0.5 * delay_factor)
                    output = read_channel(remote_conn)
                    return_msg += output
                    if re.search(
                        pri_prompt_terminator, output, flags=re.M
                    ) or re.search(alt_prompt_terminator, output, flags=re.M):
                        return remote_conn,return_msg
    
                # Check if proper data received
                if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
                    alt_prompt_terminator, output, flags=re.M
                ):
                    return remote_conn,return_msg
    
                write_channel(remote_conn,TELNET_RETURN)
                time.sleep(0.5 * delay_factor)
                i += 1
            except EOFError:
                remote_conn.close()
                msg = f"Login failed"
                raise Exception(msg)
    
        # Last try to see if we already logged in
        write_channel(remote_conn,TELNET_RETURN)
        time.sleep(0.5 * delay_factor)
        output = read_channel(remote_conn)
        return_msg += output
        if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
            alt_prompt_terminator, output, flags=re.M
        ):
            return remote_conn, return_msg
    
        msg = f"Login failed"
        remote_conn.close()
        raise Exception(msg)
    
    
    def write_channel(remote_conn,out_data):
        try:
            _write_channel(remote_conn,out_data)
        finally:
            pass
    
    
    def read_channel(remote_conn):
        output = ""
        try:
            output = _read_channel(remote_conn)
        finally:
            pass
        return output
    
    
    def send_command(
        remote_conn,
        command_string,
        expect_string=None,
        delay_factor=1,
        max_loops=500,
        auto_find_prompt=True,
        strip_prompt=True,
        strip_command=True,
        normalize=True,
    ):
        # Time to delay in each read loop
        loop_delay = 0.2
    
        # Find the current router prompt
        if expect_string is None:
            if auto_find_prompt:
                try:
                    prompt = find_prompt(remote_conn) # AQQY[$]
                except ValueError:
                    raise ValueError('can not find the prompt!')
            else:
                pass
            # 搜索 #
            search_pattern = re.escape(prompt.strip())
        else:
            search_pattern = expect_string
    
        if normalize:
            command_string = normalize_cmd(command_string)
    
        time.sleep(delay_factor * loop_delay)
        write_channel(remote_conn, command_string)
        new_data = ""
    
        cmd = command_string.strip()
        # if cmd is just an "enter" skip this section
        if cmd:
            # Make sure you read until you detect the command echo (avoid getting out of sync)
            new_data = read_until_pattern(remote_conn, pattern=re.escape(cmd))
            new_data = normalize_linefeeds(new_data)
            # Strip off everything before the command echo (to avoid false positives on the prompt)
            if new_data.count(cmd) == 1:
                new_data = new_data.split(cmd)[1:]
                new_data = TELNET_RETURN.join(new_data)
                new_data = new_data.lstrip()
                new_data = f"{cmd}{TELNET_RETURN}{new_data}"
    
        i = 1
        output = ""
        past_three_reads = deque(maxlen=3)
        first_line_processed = False
    
        # Keep reading data until search_pattern is found or until max_loops is reached.
        while i <= max_loops:
            if new_data:
                output += new_data
                past_three_reads.append(new_data)
    
                # Case where we haven't processed the first_line yet (there is a potential issue
                # in the first line (in cases where the line is repainted).
                if not first_line_processed:
                    output, first_line_processed = _first_line_handler(
                        output, search_pattern
                    )
                    # Check if we have already found our pattern
                    if re.search(search_pattern, output):
                        break
    
                else:
                    # Check if pattern is in the past three reads
                    if re.search(search_pattern, "".join(past_three_reads)):
                        break
    
            time.sleep(delay_factor * loop_delay)
            i += 1
            # 串口需要循环读取
            new_data = read_channel(remote_conn)
        else:  # while end but nobreak
            raise IOError(
                "Search pattern never detected in send_command: {}".format(
                    search_pattern
                )
            )
    
        output = _sanitize_output(
            output,
            bool_strip_command=strip_command,
            command_string=command_string,
            bool_strip_prompt=strip_prompt,
        )
    
        return output
    
    
    def _sanitize_output(output, bool_strip_command=False, command_string=None, bool_strip_prompt=False):
        """Strip out command echo, trailing router prompt and ANSI escape codes.
        """
        output = normalize_linefeeds(output)
        if bool_strip_command and command_string:
            # 替换 `
    
    `,`
    `, `
    
    ` to `
    .`
            command_string = normalize_linefeeds(command_string)
            # 处理自动换行
            output = strip_command(command_string, output)
        if bool_strip_prompt:
            output = strip_prompt(output)
        return output
    
    
    # 网络设备去掉自动换行
    def strip_command(command_string, output):
        """
        Strip command_string from output string
        Cisco IOS adds backspaces into output for long commands (i.e. for commands that line wrap)
        """
        # 思科 IOS
        backspace_char = "x08"
        if backspace_char in output:
            output = output.replace(backspace_char, "")
    
        # Juniper has a weird case where the echoed command will be " 
    "
        # i.e. there is an extra space there.
        cmd = command_string.strip()
        if output.startswith(cmd):
            output_lines = output.split(RETURN)
            new_output = output_lines[1:]
            return RETURN.join(new_output)
        else:
            # command_string isn't there; do nothing
            return output
    
    
    # 根据情况修改 prompt
    def strip_prompt(a_string, prompt='#'):
        response_list = a_string.split(RETURN)
        last_line = response_list[-1]
        if prompt in last_line:
            return RETURN.join(response_list[:-1])
        else:
            return a_string
    
    
    def _first_line_handler(data, search_pattern):
        """
        In certain situations the first line will get repainted which causes a false
        match on the terminating pattern.
        data = '111#sametwot
    #sametwo
    #result'
        to
        data = '111
    #sametwo
    #result'
        """
        try:
            # First line is the echo line containing the command. In certain situations
            # it gets repainted and needs filtered
            lines = data.split(RETURN)
            first_line = lines[0]
            if BACKSPACE_CHAR in first_line:
                pattern = search_pattern + r".*$"
                first_line = re.sub(pattern, repl="", string=first_line)
                lines[0] = first_line
                data = RETURN.join(lines)
            return (data, True)
        except IndexError:
            return (data, False)
    
    
    def read_until_pattern(remote_conn, *args, **kwargs):
        """Read channel until pattern detected. Return ALL data available."""
        return _read_channel_expect(remote_conn, *args, **kwargs)
    
    
    def _read_channel_expect(remote_conn, pattern="", re_flags=0, max_loops=150):
        output = ""
        if not pattern:
            pattern = re.escape(find_prompt(remote_conn))
    
        delay_factor = 1
        i = 1
        loop_delay = 0.1
        while i < max_loops:
            output += read_channel(remote_conn)
            if re.search(pattern, output, flags=re_flags):
                return output
            time.sleep(loop_delay * delay_factor)
            i += 1
        raise ValueError('time out for _read_channel_expect')
    
    
    def find_prompt(remote_conn):
        """Finds the current network device prompt, last line only.
    
        :param delay_factor: See __init__: global_delay_factor
        :type delay_factor: int
        """
        delay_factor = 1
        write_channel(remote_conn, TELNET_RETURN)
        sleep_time = delay_factor * 0.1
        time.sleep(sleep_time)
    
        # Initial attempt to get prompt
        prompt = read_channel(remote_conn).strip()
    
        # Check if the only thing you received was a newline
        count = 0
        while count <= 12 and not prompt:
            prompt = read_channel(remote_conn).strip()
            if not prompt:
                write_channel(remote_conn, TELNET_RETURN)
                time.sleep(sleep_time)
                if sleep_time <= 3:
                    # Double the sleep_time when it is small
                    sleep_time *= 2
                else:
                    sleep_time += 1
            count += 1
    
        # If multiple lines in the output take the last line
        prompt = normalize_linefeeds(prompt)
        prompt = prompt.split(TELNET_RETURN)[-1]
        prompt = prompt.strip()
        if not prompt:
            raise ValueError(f"Unable to find prompt: {prompt}")
        time.sleep(delay_factor * 0.1)
        return prompt
    
    
    def normalize_linefeeds(a_string):
        """Convert `
    
    `,`
    `, `
    
    ` to `
    .`
    
        """
        newline = re.compile("(
    
    
    |
    
    |
    |
    
    )")
        a_string = newline.sub(TELNET_RETURN, a_string)
        if TELNET_RETURN == "
    ":
            # Convert any remaining 
     to 
    
            return re.sub("
    ", TELNET_RETURN, a_string)
        else:
            return a_string
    
    
    def normalize_cmd(command):
        """Normalize CLI commands to have a single trailing newline.
    
        :param command: Command that may require line feed to be normalized
        :type command: str
        """
        command = command.rstrip()
        command += TELNET_RETURN
        return command
    
    
    def disconnect(remote_conn):
        """Try to gracefully close the session."""
        try:
            cleanup()
            remote_conn.close()
        except Exception:
            # There have been race conditions observed on disconnect.
            pass
        finally:
            remote_conn = None
    
    
    
    
    if __name__ == '__main__':
        upate_serial_port(serial_settings)
        remote_conn = open_connection(serial_settings)
        print(send_command(remote_conn,'lmi-eth'))
        disconnect(remote_conn)
    
    
    
  • 相关阅读:
    美团大数据质量监控平台
    大数据理论体系总结--数据仓库管理与全链路数据体系
    spark图解
    forEach、map、filter、find、sort、some等易错点整理
    轮播图(jQuery)
    js 数组、对象转json 以及 json转 数组、对象
    jquery 获取元素(父节点,子节点,兄弟节点),元素筛选
    前端面试总结 (转 0C°)
    Git入门看这一篇就够了! (转)
    Rotating Scoreboard POJ
  • 原文地址:https://www.cnblogs.com/amize/p/15272349.html
Copyright © 2011-2022 走看看