主要实现 SFTP方式 上传、下载文件;
无sftp 通过 scp方式上传、下载文件;
远程执行linux命令等;
''' paramiko ''' import traceback import paramiko as param,time,re from scp import SCPClient """ ssh=param.SSHClient() ssh.set_missing_host_key_policy(param.AutoAddPolicy()) ssh.connect(hostname='103.20.1.181',port=22,username='root',password='888888') stdin,stdout,stderr=ssh.exec_command('pwd') #print('stdin:',stdin) #print('stdout:',stdout) #print('stderr:',stderr) result=stdout.read().decode() print(result) err=stderr.read().decode() print(err) ssh.close() """ class SSHConnection(): def __init__(self,host,user,passwd,port=22): #远程server的ip、用户名、密码 self.trans = param.Transport((host,port)) try: self.trans.connect(username=user, password=passwd) except: traceback.print_exc() raise Exception("%s建立连接失败!"%host) def close(self): self.trans.close() def upload(self,local_path,remote_path): #通过sftp上传文件,本地远程都要具体到文件名 sftpClient=param.SFTPClient.from_transport(self.trans) sftpClient.put(local_path,remote_path) sftpClient.close() def download(self,remote_path,local_path):#下载文件,本地远程都要具体到文件名 sftpClient=param.SFTPClient.from_transport(self.trans) sftpClient.get(remote_path,local_path) sftpClient.close() def scp_uploadFile(self,localFile,remoteFile): # ssh = param.SSHClient() # ssh.set_missing_host_key_policy(param.AutoAddPolicy()) # ssh.connect(hostname=self.host,port=self.port,username=self.username,password=self.password) #ssh._transport = self.trans #scpclient = SCPClient(ssh.get_transport(), socket_timeout=15.0) #配合上面的全部注释项 scpClient = SCPClient(self.trans,socket_timeout=15.0) #简化版 try: scpClient.put(localFile, remoteFile) except : traceback.print_exc() finally: scpClient.close() def scp_downloadFile(self,remoteFile,localFile): scpClient = SCPClient(self.trans,socket_timeout=15.0) #简化版 try: scpClient.get(remoteFile,localFile) except : traceback.print_exc() finally: scpClient.close() def do_cmd(self,command):#远程执行linux命令,返回执行结果 sshClient=param.SSHClient() #ssh.set_missing_host_key_policy(param.AutoAddPolicy()) #ssh.connect(hostname=self.host,port=self.port,username=self.username,password=self.password) sshClient._transport=self.trans a,out,err=sshClient.exec_command(command) result=out.read().decode() sshClient.close() return result def get_remote_ip(self):#获取远程服务器的ip ifconfig=self.do_cmd('ifconfig') return re.search(r'd+.d+.d+.d+',ifconfig).group() if __name__=='__main__': ssh=SSHConnection('*.*.*.*','username','password') #ssh=SSHConnection('*.*.*.*','username','password') #ssh.connect() print(ssh) # command='pwd' # print('pwd命令结果:',ssh.cmd(command)) # ifconfig=ssh.cmd('ls') # print(ifconfig) #ssh.scp_downloadFile('/home/oneu/ccpos/config/config.ini','f:\test\config.ini') #ssh.scp_downloadFile('/home/oneu/ccpos/config/app.ini', 'f:\test\app.ini') ssh.scp_downloadFile('/home/oneu/ccpos/config/version.xml', 'f:\test\version.xml') ssh.close() ''' ssh.upload('pp232.txt','/root/jin/aa/pp233.txt') ssh.download('/root/jin/aa/lall.txt','F:\pylianxi\test\113.txt') ifconfig=ssh.cmd('ifconfig') print(ifconfig) print('匹配的IP地址为:',ssh.get_remote_ip()) '''