zoukankan      html  css  js  c++  java
  • paramiko远程连接服务器

    paramiko远程连接服务器

    1.安装:

    pip install paramiko
    

    2.连接

    • 基于账号密码连接
    import paramiko
    
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname="要连接主机IP", port=22, username='root', password='用户密码')
    
    # 执行命令
    # stdin:标准输入(就是你输入的命令);stdout:标准输出(就是命令执行结果);stderr:标准错误(命令执行过程中如果出错了就把错误打到这里),stdout和stderr仅会输出一个
    stdin, stdout, stderr = ssh.exec_command('df')
    # 获取命令结果
    result = (stdout.read().decode('utf-8'))  # 这个有问题,如果执行的命令是错误的,会不显示错误,可以修改一下,先判断stdout有没有值,如果输出没有,就显示错误
    print(result)
    # 关闭连接
    ssh.close()
    
    • 基于公钥连接
    import paramiko
    # 密钥
    private_key = paramiko.RSAKey.from_private_key_file(r'J:westCoastKey.pem')
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='要连接主机ip', port=22, username='root', pkey=private_key)
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('df')
    # 获取命令结果
    result = stdout.read()
    print(result.decode())
    # 关闭连接
    ssh.close()
    

    3.一个简单封装:

    import paramiko
    
    class SSHConnection(object):
        def __init__(self,host,port,username,pwd):
            self.host = host
            self.port = port
            self.username = username
            self.pwd = pwd
        def connect(self):
            """建立连接"""
            transport = paramiko.Transport(self.host,self.port)
            transport.connect(username=self.username,password=self.pwd)
            self.__transport = transport
        def close(self):
            """
            关闭ssh连接
            :return:
            """
            self.__transport.close()
        def cmd(self,command):
            ssh = paramiko.SSHClient()
            ssh._transport = self.__transport
            stdin, stdout, stderr = ssh.exec_command(command)
            result = stdout.read()
            return str(result,encoding="utf-8")
        def download(self, remote_path, local_path):
            """
            下载
            :param remote_path: 是远端文件路径
            :param local_path: 下载本地文件路径
            :return:
            """
            sftp = paramiko.SFTPClient.from_transport(self.__transport)
            sftp.get(remote_path,local_path)
        def upload(self, local_path, target_path):
            """
            上传信息
            :param local_path: 本地文件路径
            :param target_path: 远端文件路径
            :return:
            """
            sftp = paramiko.SFTPClient.from_transport(self.__transport)
            sftp.put(local_path, target_path)
    def main():
        ssh = SSHConnection("要连接主机IP",22,"root","登陆密码")
        ssh.connect()
        ssh.cmd("ps -ef | grep nginx")
        ssh.download("远程文件路径","本地文件路径")
        ssh.upload("本地文件路径","远程文件路径")
        ssh.close()
    if __name__ == '__main__':
        main()
    

    4.像xshell一样操作服务器

    import paramiko
    import re
    from time import sleep
    
    # 定义一个类,表示一台远端linux主机
    class Linux(object):
        # 通过IP, 用户名,密码,超时时间初始化一个远程Linux主机
        def __init__(self, ip, username, password, timeout=30):
            self.ip = ip
            self.username = username
            self.password = password
            self.timeout = timeout
            # transport和chanel
            self.t = ''
            self.chan = ''
            # 链接失败的重试次数
            self.try_times = 3
    
        # 调用该方法连接远程主机
        def connect(self):
            while True:
                # 连接过程中可能会抛出异常,比如网络不通、链接超时
                try:
                    self.t = paramiko.Transport(sock=(self.ip, 22))
                    self.t.connect(username=self.username, password=self.password)
                    self.chan = self.t.open_session()
                    self.chan.settimeout(self.timeout)
                    self.chan.get_pty()
                    self.chan.invoke_shell()
                    # 如果没有抛出异常说明连接成功,直接返回
                    print('连接%s成功' % self.ip)
                    # 接收到的网络数据解码为str
                    print(self.chan.recv(65535).decode('utf-8'))
                    return
                # 这里不对可能的异常如socket.error, socket.timeout细化,直接一网打尽
                except Exception:
                    if self.try_times != 0:
                        print('连接%s失败,进行重试' %self.ip)
                        self.try_times -= 1
                    else:
                        print('重试3次失败,结束程序')
                        exit(1)
    
        # 断开连接
        def close(self):
            self.chan.close()
            self.t.close()
    
        # 发送要执行的命令
        def send(self, cmd):
            cmd += '
    '
            # 通过命令执行提示符来判断命令是否执行完成
            p = re.compile(r']#')
    
            result = ''
            # 发送要执行的命令
            self.chan.send(cmd)
            # 回显很长的命令可能执行较久,通过循环分批次取回回显
            while True:
                sleep(0.5)
                ret = self.chan.recv(65535)
                ret = ret.decode('utf-8')
                result += ret
                if p.search(ret):
                    result = result.strip(cmd)
                    print(result, end=" ")
                    return result
    if __name__ == '__main__':
        host = Linux("要连接服务器IP", "用户名", "密码")
        host.connect()
        while True:
            cmd = input()
            if cmd == "exit":
                host.close()
                break
            host.send(cmd)
    
  • 相关阅读:
    SAP CRM One Order函数CRM_Object_FILL_OW的设计原理
    SAP CRM One Order函数CHANGE_OW的设计原理
    SAP CRM One Order函数SAVE_EC的设计原理
    POJ-1125 Stockbroker Grapevine
    GStreamer 1.0 series序列示例
    H265与ffmpeg改进开发
    FFmpeg扩展开发
    在Yolov5 Yolov4 Yolov3 TensorRT 实现Implementation
    TensorRT 基于Yolov3的开发
    大规模数据处理Apache Spark开发
  • 原文地址:https://www.cnblogs.com/xujunkai/p/12380091.html
Copyright © 2011-2022 走看看