基于Python同Linux进行交互式操作实现通过堡垒机访问目标机
by:授客 QQ:1033553122 欢迎加入全国软件测试交流群:7156436
远程登录Linux堡垒机,同Linux进行交互式操作,访问目标机
Win7 64位
Python 3.3.4
paramiko 1.15.2
下载地址:
https://pypi.python.org/pypi/paramiko/1.15.2
https://pan.baidu.com/s/1i4SJ1CL
cryptography-1.0-cp34-none-win_amd64.whl
(如果paramiko可以正常安装完,则不需要安装该类库)
下载地址:
https://pypi.python.org/pypi/cryptography/1.0
https://pan.baidu.com/s/1jIRBJvg
安装好后,找到nt.py(本例中路径为:
Libsite-packagespycrypto-2.6.1-py3.4-win-amd64.eggCryptoRandomOSRNG t.py),修改
import winrandom
为
from Crypto.Random.OSRNG import winrandom
如下
#import winrandom
from Crypto.Random.OSRNG import winrandom
以解决ImportError: No module named 'winrandom'错误
说明:具体文件路径可能还得根据实际报错情况来确定,如下
............(略)
"D:Program Filespython33libsite-packagesCryptoRandomOSRNG t.py", line 28, in
import winrandom
ImportError: No module named 'winrandom'
VS2010
因操作系统而异,可能需要安装VS2010,以解决包依赖问题
需求
SSH登录堡垒机后,根据提示输入相关信息,进入到目标机,如下
#!/usr/bin/env/ python
#
-*- coding:utf-8 -*-
__author__
=
'shouke'
from
paramiko.client
import
AutoAddPolicy
from
paramiko.client
import
SSHClient
class
MySSHClient:
def
__init__(self):
self.ssh_client
= SSHClient()
#
连接登录
def
connect(self,
hostname, port, username, password,
host_via_by_bastion):
try:
self.ssh_client.set_missing_host_key_policy(AutoAddPolicy())
self.ssh_client.connect(hostname=hostname,
port=port,
username=username,
password=password,
timeout=30)
channel
=
self.ssh_client.invoke_shell()
channel.settimeout(10)
# 读、写操作超时时间,10秒
######################
定制化开发 ###########
print('正在通过堡垒机:%s
访问目标机:%s'
% (hostname, host_via_by_bastion))
host_via_by_bastion
= host_via_by_bastion +
'
'
prompt_input_list
= [{'Please
enter your ID':'xxx255222
'},
{'Please
enter your password':
'passwd123
'},
{'Please
select your app ip':host_via_by_bastion},
{'select
your user for login':'1
'}]
end_flag1
=
']$'
end_flag2
=
']#'
stdout
=
'' #
存放每次执行读取的内容
for
item
in
prompt_input_list:
prompt
=
list(item.keys())[0]
input
= item[prompt]
flag
=
False
while
stdout.find(prompt) == -1:
try:
stdout
+= channel.recv(65535).decode('utf-8')#.decode('utf-8')
flag
=
True
except
Exception
as
e:
print('通过堡垒机:%s
访问目标机:%s
失败:%s'
% (hostname, host_via_by_bastion, e))
flag
=
False
break
if
flag:
channel.send(input)
else:
# 未找到了对应提示
return
[False,
'通过堡垒机:%s
访问目标机:%s
失败,可能是读取命令返回结果超时,或者没找到对应输入提示' %
(hostname, host_via_by_bastion)]
flag
=
False
while
not
(stdout.rstrip().endswith(end_flag1)
or
stdout.rstrip().endswith(end_flag2)):
try:
stdout
= channel.recv(2048).decode('utf-8')
flag
=
True
except
Exception
as
e:
print('通过堡垒机:%s
访问目标机:%s
失败:%s'
% (hostname, host_via_by_bastion, e))
flag
=
False
break
if
flag:
return
[True,
'']
else:
# 未找到了对应提示
return
[False,
'没出现成功登录提示符 ]$
或 ]#
']
channel.close()
return
[True,
'']
except
Exception
as
e:
return
[False,
'%s'
% e]
#
远程执行命令
def
exec_command(self,
command, target_host, bastion_host):
try:
channel
=
self.ssh_client.invoke_shell()
channel.settimeout(30)
# 读、写操作超时时间,30秒
end_flag1
=
']$'
end_flag2
=
']#'
################
定制化开发 ##############################
if
bastion_host !=
'':
print('正在通过堡垒机:%s
访问目标机:%s'
% (bastion_host, target_host))
target_host_input
= target_host +
'
'
prompt_input_list
= [{'Please
enter your ID':'01367599
'},
{'Please
enter your password':
'Huozhe2020
'},
{'Please
select your app ip':target_host_input},
{'select
your user for login':'1
'}]
stdout
=
'' #
存放每次执行读取的内容
for
item
in
prompt_input_list:
prompt
=
list(item.keys())[0]
input
= item[prompt]
flag
=
False
while
stdout.find(prompt) == -1:
try:
stdout
+= channel.recv(65535).decode('utf-8')#.decode('utf-8')
flag
=
True
except
Exception
as
e:
print('通过堡垒机:%s
访问目标机:%s
失败:%s'
% (bastion_host, target_host,
e))
flag
=
False
break
if
flag:
channel.send(input)
else:
# 未找到了对应提示
print('通过堡垒机:%s
访问目标机:%s
失败,可能是读取命令返回结果超时,或者没找到对应输入提示' %
(bastion_host, target_host))
#
return [False, '通过堡垒机:%s 访问目标机:%s 失败,可能是读取命令返回结果超时,或者没找到对应输入提示'
% (bastion_host,
target_host)]
return
while
not
(stdout.rstrip().endswith(end_flag1)
or
stdout.rstrip().endswith(end_flag2)):
try:
stdout
= channel.recv(2048).decode('utf-8')
except
Exception
as
e:
print('通过堡垒机:%s
访问目标机:%s
失败:没出现成功登录提示符 ]$ 或 ]#'
% (bastion_host, target_host))
return
channel.send(command+'
')
command_res
=
'' #
存放每次执行读取的内容
while
not
(command_res.endswith(end_flag1)
or
command_res.endswith(end_flag2)):
try:
command_res
= channel.recv(2048).decode('utf-8').strip()
except
Exception
as
e:
print('在目标机(IP:
%s)上进行读取操作超时'
% target_host)
break
channel.close()
except
Exception
as
e:
print('针对目标机:%s
执行命令: %s
出错 %s'
% (target_host, command, e))