- serial和deque模块
import sys
import time
import re
from collections import deque
try:
import serial.tools.list_ports
PYSERIAL_INSTALLED = True
except ImportError:
PYSERIAL_INSTALLED = False
MAX_BUFFER = 65535
TELNET_RETURN = "
"
RETURN = TELNET_RETURN
BACKSPACE_CHAR = "x08"
# Default values
serial_settings = {
"port": "COM4",
"baudrate": 115200,
"bytesize": serial.EIGHTBITS,
"parity": serial.PARITY_NONE,
"stopbits": serial.STOPBITS_ONE,
}
def upate_serial_port(serial_settings):
comm_port = serial_settings.pop("port")
comm_port = check_serial_port(comm_port)
serial_settings.update({"port": comm_port})
def check_serial_port(name):
"""returns valid COM Port."""
if not PYSERIAL_INSTALLED:
msg = (
"
pyserial is not installed. Please PIP install pyserial:
"
"pip install pyserial
"
)
raise ValueError(msg)
try:
cdc = next(serial.tools.list_ports.grep(name))
return cdc[0]
except StopIteration:
msg = f"device {name} not found. "
msg += "available devices are: "
ports = list(serial.tools.list_ports.comports())
for p in ports:
msg += f"{str(p)},"
raise ValueError(msg)
def _write_channel(remote_conn, out_data, encoding="utf-8"):
remote_conn.write(write_bytes(out_data, encoding=encoding))
remote_conn.flush()
def write_bytes(out_data, encoding="ascii"):
"""Legacy for Python2 and Python3 compatible byte stream."""
if sys.version_info[0] >= 3:
if isinstance(out_data, type("")):
if encoding == "utf-8":
return out_data.encode("utf-8")
else:
return out_data.encode("ascii", "ignore")
elif isinstance(out_data, type(b"")):
return out_data
msg = "Invalid value for out_data neither unicode nor byte string: {}".format(
out_data
)
raise ValueError(msg)
def _read_channel(remote_conn):
output = ""
while remote_conn.inWaiting() > 0:
output += remote_conn.read(remote_conn.inWaiting()).decode(
"utf-8", "ignore"
)
return output
def establish_connection(serial_settings):
remote_conn = serial.Serial(**serial_settings)
remote_conn, login_msg = serial_login(remote_conn,'root','password')
print('login_msg:', login_msg)
return remote_conn
def open_connection(serial_settings):
_modify_connection_params()
return establish_connection(serial_settings)
def _modify_connection_params():
"""Modify connection parameters prior connection."""
pass
def cleanup():
"""Logout of the session additional cleanup."""
pass
def serial_login(
remote_conn,
username,
password,
pri_prompt_terminator=r"~#", # device type1
alt_prompt_terminator=r"$", # device type2
username_pattern=r"(?:[Uu]ser:|sername|ogin)",
pwd_pattern=r"assword",
max_loops=30,
):
return (
telnet_login(
remote_conn,
username,
password,
pri_prompt_terminator,
alt_prompt_terminator,
username_pattern,
pwd_pattern,
max_loops,
))
def telnet_login(
remote_conn,
username,
password,
pri_prompt_terminator=r"#s*$",
alt_prompt_terminator=r">s*$",
username_pattern=r"(?:user:|username|login|user name)",
pwd_pattern=r"assword",
max_loops=20,
):
delay_factor = 1
time.sleep(1 * delay_factor)
output = ""
return_msg = ""
i = 1
while i <= max_loops:
try:
output = read_channel(remote_conn)
return_msg += output
print('output: ',output)
# Search for username pattern / send username
if re.search(username_pattern, output, flags=re.I):
# Sometimes username/password must be terminated with "
" and not "
"
write_channel(remote_conn,username + "
")
time.sleep(1 * delay_factor)
output = read_channel(remote_conn)
return_msg += output
# Search for password pattern / send password
if re.search(pwd_pattern, output, flags=re.I):
# Sometimes username/password must be terminated with "
" and not "
"
write_channel(remote_conn,password + "
")
time.sleep(0.5 * delay_factor)
output = read_channel(remote_conn)
return_msg += output
if re.search(
pri_prompt_terminator, output, flags=re.M
) or re.search(alt_prompt_terminator, output, flags=re.M):
return remote_conn,return_msg
# Check if proper data received
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return remote_conn,return_msg
write_channel(remote_conn,TELNET_RETURN)
time.sleep(0.5 * delay_factor)
i += 1
except EOFError:
remote_conn.close()
msg = f"Login failed"
raise Exception(msg)
# Last try to see if we already logged in
write_channel(remote_conn,TELNET_RETURN)
time.sleep(0.5 * delay_factor)
output = read_channel(remote_conn)
return_msg += output
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return remote_conn, return_msg
msg = f"Login failed"
remote_conn.close()
raise Exception(msg)
def write_channel(remote_conn,out_data):
try:
_write_channel(remote_conn,out_data)
finally:
pass
def read_channel(remote_conn):
output = ""
try:
output = _read_channel(remote_conn)
finally:
pass
return output
def send_command(
remote_conn,
command_string,
expect_string=None,
delay_factor=1,
max_loops=500,
auto_find_prompt=True,
strip_prompt=True,
strip_command=True,
normalize=True,
):
# Time to delay in each read loop
loop_delay = 0.2
# Find the current router prompt
if expect_string is None:
if auto_find_prompt:
try:
prompt = find_prompt(remote_conn) # AQQY[$]
except ValueError:
raise ValueError('can not find the prompt!')
else:
pass
# 搜索 #
search_pattern = re.escape(prompt.strip())
else:
search_pattern = expect_string
if normalize:
command_string = normalize_cmd(command_string)
time.sleep(delay_factor * loop_delay)
write_channel(remote_conn, command_string)
new_data = ""
cmd = command_string.strip()
# if cmd is just an "enter" skip this section
if cmd:
# Make sure you read until you detect the command echo (avoid getting out of sync)
new_data = read_until_pattern(remote_conn, pattern=re.escape(cmd))
new_data = normalize_linefeeds(new_data)
# Strip off everything before the command echo (to avoid false positives on the prompt)
if new_data.count(cmd) == 1:
new_data = new_data.split(cmd)[1:]
new_data = TELNET_RETURN.join(new_data)
new_data = new_data.lstrip()
new_data = f"{cmd}{TELNET_RETURN}{new_data}"
i = 1
output = ""
past_three_reads = deque(maxlen=3)
first_line_processed = False
# Keep reading data until search_pattern is found or until max_loops is reached.
while i <= max_loops:
if new_data:
output += new_data
past_three_reads.append(new_data)
# Case where we haven't processed the first_line yet (there is a potential issue
# in the first line (in cases where the line is repainted).
if not first_line_processed:
output, first_line_processed = _first_line_handler(
output, search_pattern
)
# Check if we have already found our pattern
if re.search(search_pattern, output):
break
else:
# Check if pattern is in the past three reads
if re.search(search_pattern, "".join(past_three_reads)):
break
time.sleep(delay_factor * loop_delay)
i += 1
# 串口需要循环读取
new_data = read_channel(remote_conn)
else: # while end but nobreak
raise IOError(
"Search pattern never detected in send_command: {}".format(
search_pattern
)
)
output = _sanitize_output(
output,
bool_strip_command=strip_command,
command_string=command_string,
bool_strip_prompt=strip_prompt,
)
return output
def _sanitize_output(output, bool_strip_command=False, command_string=None, bool_strip_prompt=False):
"""Strip out command echo, trailing router prompt and ANSI escape codes.
"""
output = normalize_linefeeds(output)
if bool_strip_command and command_string:
# 替换 `
`,`
`, `
` to `
.`
command_string = normalize_linefeeds(command_string)
# 处理自动换行
output = strip_command(command_string, output)
if bool_strip_prompt:
output = strip_prompt(output)
return output
# 网络设备去掉自动换行
def strip_command(command_string, output):
"""
Strip command_string from output string
Cisco IOS adds backspaces into output for long commands (i.e. for commands that line wrap)
"""
# 思科 IOS
backspace_char = "x08"
if backspace_char in output:
output = output.replace(backspace_char, "")
# Juniper has a weird case where the echoed command will be "
"
# i.e. there is an extra space there.
cmd = command_string.strip()
if output.startswith(cmd):
output_lines = output.split(RETURN)
new_output = output_lines[1:]
return RETURN.join(new_output)
else:
# command_string isn't there; do nothing
return output
# 根据情况修改 prompt
def strip_prompt(a_string, prompt='#'):
response_list = a_string.split(RETURN)
last_line = response_list[-1]
if prompt in last_line:
return RETURN.join(response_list[:-1])
else:
return a_string
def _first_line_handler(data, search_pattern):
"""
In certain situations the first line will get repainted which causes a false
match on the terminating pattern.
data = '111#sametwot
#sametwo
#result'
to
data = '111
#sametwo
#result'
"""
try:
# First line is the echo line containing the command. In certain situations
# it gets repainted and needs filtered
lines = data.split(RETURN)
first_line = lines[0]
if BACKSPACE_CHAR in first_line:
pattern = search_pattern + r".*$"
first_line = re.sub(pattern, repl="", string=first_line)
lines[0] = first_line
data = RETURN.join(lines)
return (data, True)
except IndexError:
return (data, False)
def read_until_pattern(remote_conn, *args, **kwargs):
"""Read channel until pattern detected. Return ALL data available."""
return _read_channel_expect(remote_conn, *args, **kwargs)
def _read_channel_expect(remote_conn, pattern="", re_flags=0, max_loops=150):
output = ""
if not pattern:
pattern = re.escape(find_prompt(remote_conn))
delay_factor = 1
i = 1
loop_delay = 0.1
while i < max_loops:
output += read_channel(remote_conn)
if re.search(pattern, output, flags=re_flags):
return output
time.sleep(loop_delay * delay_factor)
i += 1
raise ValueError('time out for _read_channel_expect')
def find_prompt(remote_conn):
"""Finds the current network device prompt, last line only.
:param delay_factor: See __init__: global_delay_factor
:type delay_factor: int
"""
delay_factor = 1
write_channel(remote_conn, TELNET_RETURN)
sleep_time = delay_factor * 0.1
time.sleep(sleep_time)
# Initial attempt to get prompt
prompt = read_channel(remote_conn).strip()
# Check if the only thing you received was a newline
count = 0
while count <= 12 and not prompt:
prompt = read_channel(remote_conn).strip()
if not prompt:
write_channel(remote_conn, TELNET_RETURN)
time.sleep(sleep_time)
if sleep_time <= 3:
# Double the sleep_time when it is small
sleep_time *= 2
else:
sleep_time += 1
count += 1
# If multiple lines in the output take the last line
prompt = normalize_linefeeds(prompt)
prompt = prompt.split(TELNET_RETURN)[-1]
prompt = prompt.strip()
if not prompt:
raise ValueError(f"Unable to find prompt: {prompt}")
time.sleep(delay_factor * 0.1)
return prompt
def normalize_linefeeds(a_string):
"""Convert `
`,`
`, `
` to `
.`
"""
newline = re.compile("(
|
|
|
)")
a_string = newline.sub(TELNET_RETURN, a_string)
if TELNET_RETURN == "
":
# Convert any remaining
to
return re.sub("
", TELNET_RETURN, a_string)
else:
return a_string
def normalize_cmd(command):
"""Normalize CLI commands to have a single trailing newline.
:param command: Command that may require line feed to be normalized
:type command: str
"""
command = command.rstrip()
command += TELNET_RETURN
return command
def disconnect(remote_conn):
"""Try to gracefully close the session."""
try:
cleanup()
remote_conn.close()
except Exception:
# There have been race conditions observed on disconnect.
pass
finally:
remote_conn = None
if __name__ == '__main__':
upate_serial_port(serial_settings)
remote_conn = open_connection(serial_settings)
print(send_command(remote_conn,'lmi-eth'))
disconnect(remote_conn)