zoukankan      html  css  js  c++  java
  • Python_基于Python同Linux进行交互式操作实现通过堡垒机访问目标机

    基于PythonLinux进行交互式操作实现通过堡垒机访问目标机

     

    by:授客 QQ1033553122 欢迎加入全国软件测试交流群:7156436

     

    实现功能 1

    测试环境 1

    代码实践 2

    注意 5

     

    实现功能

    远程登录Linux堡垒机,同Linux进行交互式操作,访问目标机

     

    测试环境

    Win7 64

     

    Python 3.3.4

     

    paramiko 1.15.2

    下载地址:

    https://pypi.python.org/pypi/paramiko/1.15.2

    https://pan.baidu.com/s/1i4SJ1CL

     

    cryptography-1.0-cp34-none-win_amd64.whl

    (如果paramiko可以正常安装完,则不需要安装该类库)

    下载地址:

    https://pypi.python.org/pypi/cryptography/1.0

    https://pan.baidu.com/s/1jIRBJvg

     

    安装好后,找到nt.py(本例中路径为:

    Libsite-packagespycrypto-2.6.1-py3.4-win-amd64.eggCryptoRandomOSRNG t.py),修改

    import winrandom

    from Crypto.Random.OSRNG import winrandom

    如下

    #import winrandom

    from Crypto.Random.OSRNG import winrandom

     

    以解决ImportError: No module named 'winrandom'错误

     

    说明:具体文件路径可能还得根据实际报错情况来确定,如下

    ............()

    "D:Program Filespython33libsite-packagesCryptoRandomOSRNG t.py", line 28, in

        import winrandom

    ImportError: No module named 'winrandom'

     

     

     

    VS2010

    因操作系统而异,可能需要安装VS2010,以解决包依赖问题

     

    需求

    SSH登录堡垒机后,根据提示输入相关信息,进入到目标机,如下


     

    代码实践

     

    #!/usr/bin/env/ python
    # -*- coding:utf-8 -*-

    __author__ = 'shouke'


    from paramiko.client import AutoAddPolicy
    from paramiko.client import SSHClient

    class MySSHClient:

        def __init__(self):
            self.ssh_client = SSHClient()

        # 连接登录
        def connect(self, hostname, port, username, password, host_via_by_bastion):
            try:
                self.ssh_client.set_missing_host_key_policy(AutoAddPolicy())
                self.ssh_client.connect(hostname=hostname, port=port, username=username, password=password, timeout=30)
                channel = self.ssh_client.invoke_shell()
                channel.settimeout(10) # 读、写操作超时时间,10

                ###################### 定制化开发 ###########

                print('正在通过堡垒机:%s 访问目标机:%s' % (hostname, host_via_by_bastion))
                host_via_by_bastion = host_via_by_bastion + ' '

                prompt_input_list = [{'Please enter your ID':'xxx255222 '}, {'Please enter your password': 'passwd123 '}, {'Please select your app ip':host_via_by_bastion}, {'select your user for login':'1 '}]


                end_flag1 = ']$'
                end_flag2 = ']#'

                stdout = ''  # 存放每次执行读取的内容
                for item in prompt_input_list:
                    prompt = list(item.keys())[0]
                    input = item[prompt]

                    flag = False
                    while stdout.find(prompt) == -1:
                        try:
                            stdout += channel.recv(65535).decode('utf-8')#.decode('utf-8')
                            flag = True
                        except Exception as e:
                            print('通过堡垒机:%s 访问目标机:%s 失败:%s' % (hostname, host_via_by_bastion, e))
                            flag = False
                            break
                    if flag:
                        channel.send(input)
                    else: # 未找到了对应提示
                        return [False, '通过堡垒机:%s 访问目标机:%s 失败,可能是读取命令返回结果超时,或者没找到对应输入提示'  % (hostname, host_via_by_bastion)]

                flag = False
                while not (stdout.rstrip().endswith(end_flag1) or stdout.rstrip().endswith(end_flag2)):
                    try:
                        stdout = channel.recv(2048).decode('utf-8')
                        flag = True
                    except Exception as e:
                        print('通过堡垒机:%s 访问目标机:%s 失败:%s' % (hostname, host_via_by_bastion, e))
                        flag = False
                        break
                if flag:
                    return [True, '']
                else: # 未找到了对应提示
                    return [False, '没出现成功登录提示符 ]$ ]# ']
                channel.close()
                return  [True, '']
            except Exception as e:
                return [False, '%s' % e]

        # 远程执行命令
        def exec_command(self, command, target_host, bastion_host):
            try:
                channel = self.ssh_client.invoke_shell()
                channel.settimeout(30) # 读、写操作超时时间,30

                end_flag1 = ']$'
                end_flag2 = ']#'
                ################ 定制化开发 ##############################
                if bastion_host != '':
                    print('正在通过堡垒机:%s 访问目标机:%s' % (bastion_host, target_host))
                    target_host_input = target_host + ' '
                    prompt_input_list = [{'Please enter your ID':'01367599 '}, {'Please enter your password': 'Huozhe2020 '}, {'Please select your app ip':target_host_input}, {'select your user for login':'1 '}]

                    stdout = ''  # 存放每次执行读取的内容
                    for item in prompt_input_list:
                        prompt = list(item.keys())[0]
                        input = item[prompt]

                        flag = False
                        while stdout.find(prompt) == -1:
                            try:
                                stdout += channel.recv(65535).decode('utf-8')#.decode('utf-8')
                                flag = True
                            except Exception as e:
                                print('通过堡垒机:%s 访问目标机:%s 失败:%s' %  (bastion_host, target_host, e))
                                flag = False
                                break
                        if flag:
                            channel.send(input)
                        else: # 未找到了对应提示
                            print('通过堡垒机:%s 访问目标机:%s 失败,可能是读取命令返回结果超时,或者没找到对应输入提示'  %  (bastion_host, target_host))
                            # return [False, '通过堡垒机:%s 访问目标机:%s 失败,可能是读取命令返回结果超时,或者没找到对应输入提示'  %  (bastion_host, target_host)]
                            return


                    while not (stdout.rstrip().endswith(end_flag1) or stdout.rstrip().endswith(end_flag2)):
                        try:
                            stdout = channel.recv(2048).decode('utf-8')
                        except Exception as e:
                            print('通过堡垒机:%s 访问目标机:%s 失败:没出现成功登录提示符 ]$ ]#' % (bastion_host, target_host))
                            return

                channel.send(command+' ')

                command_res = ''  # 存放每次执行读取的内容
                while not (command_res.endswith(end_flag1) or command_res.endswith(end_flag2)):
                    try:
                        command_res = channel.recv(2048).decode('utf-8').strip()
                    except Exception as e:
                        print('在目标机(IP: %s)上进行读取操作超时' % target_host)
                        break
                channel.close()
            except Exception as e:
               print('针对目标机:%s 执行命令: %s 出错 %s' % (target_host, command, e))

  • 相关阅读:
    toad 快捷键大全
    validateRequest 相关的作用
    为何有着良好设计的系统代码反而不容易看懂?
    致命错误: zlib.h:没有那个文件或目录
    全局变量相互依赖和初始化顺序的解决办法
    解决“possibly undefined macro: AC_PROG_LIBTOOL”
    Rsync完全配置
    undefined reference to `clock_gettime'
    解决“configure: line 2747: g: command not found”
    openssl编程轻松入门(含完整示例)
  • 原文地址:https://www.cnblogs.com/shouke/p/10157487.html
Copyright © 2011-2022 走看看