zoukankan      html  css  js  c++  java
  • paramiko模块和gitpython模块的使用

    paramiko模块

    通过ssh远程连接服务器并执行响应的命令,类似于Xshell

    ansible用来批量管理远程服务器,底层其实用的就是paramiko模块

    安装

    pip3 install paramiko
    

    使用

    paramiko模块即支持用户名密码的方式操作服务器

    也支持公钥私钥的方式操作服务器

    并且实际生产中公钥私钥用的较多,因为密码是敏感信息

    执行命令

    """执行命令  用户名和密码的方式"""
    # 创建对象
    ssh = paramiko.SSHClient()
    # 允许链接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    
    # 链接服务器
    ssh.connect(hostname='172.16.219.173',port=22,username='root',password='jason123')
    
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('ls /')
    """
    stdin用来输入额外的命令 
        yum install ansible  额外的命令-y
    stdout命令的返回结果  正确
    stderr命令的返回结果  错误
    """
    res = stdout.read()  # 网络传输过来的二进制数据
    print(res.decode('utf-8'))
    
    # 关闭链接
    ssh.close()
    '''
    不加下面的这句话报错:TypeError: 'NoneType' object is not callable
    具体原因不详:在必应上查的结果是paramiko模块使用过程中产生的垃圾没被回收(垃圾回收)
    '''
    del ssh, stdin, stdout, stderr
    
    
    
    
    """
    1.生成
    ssh-keygen -t rsa
    
    2.将你的公钥拷贝到远程服务器
    ssh-copy-id -i ~/.ssh/id_rsa.pub 用户名@服务器地址
    
    3.可以直接使用私钥路径 也可以单独拷贝出来生产一个文件
    私钥路径:/Users/jason/.ssh/id_rsa
    """
    # 公钥和私钥(先讲公钥保存到服务器上)
    import paramiko
    
    # 读取本地私钥
    private_key = paramiko.RSAKey.from_private_key_file('a.txt')
    
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='172.16.219.173', port=22, username='root', pkey=private_key)
    
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('ls /')
    # 获取命令结果
    result = stdout.read()
    print(result.decode('utf-8'))
    # 关闭连接
    ssh.close()
    '''
    不加下面的这句话报错:TypeError: 'NoneType' object is not callable
    具体原因不详:在必应上查的结果是paramiko模块使用过程中产生的垃圾没被回收(垃圾回收)
    '''
    del ssh, stdin, stdout, stderr
    

    上传下载文件

    """上传下载文件  用户名和密码的方式"""
    import paramiko
    
    # 用户名和密码
    transport = paramiko.Transport(('172.16.219.173', 22))
    transport.connect(username='root', password='jason123')
    
    sftp = paramiko.SFTPClient.from_transport(transport)
    
    # 上传文件
    # sftp.put("a.txt", '/data/b.txt')  # 注意上传文件到远程某个文件下 文件必须存在
    
    # 下载文件
    sftp.get('/data/b.txt', 'c.txt')  # 将远程文件下载到本地并重新命令
    transport.close()
    
    
    
    """上传下载文件 公钥私钥的方式"""
    # 公钥和私钥
    import paramiko
    private_key = paramiko.RSAKey.from_private_key_file('c.txt')
    transport = paramiko.Transport(('172.16.219.173', 22))
    transport.connect(username='root', pkey=private_key)
    sftp = paramiko.SFTPClient.from_transport(transport)
    # 将location.py 上传至服务器 /tmp/test.py
    # sftp.put('manage.py', '/data/temp.py')
    
    # 将remove_path 下载到本地 local_path
    # sftp.get('remove_path', 'local_path')
    transport.close()
    

    类封装

    """
    我现在即想执行命令又想上传下载文件并且多次执行
    yum install ansible
    yum install redis
    yum install redis
    upload
    
    单链接下完成多部操作
    """
    # 下面写的类 你只要只要是想通过paramiko链接服务器都可以使用
    import paramiko
    
    
    class SSHProxy(object):
        def __init__(self, hostname, port, username, password):
            self.hostname = hostname
            self.port = port
            self.username = username
            self.password = password
            self.transport = None
    
        def open(self):  # 给对象赋值一个上传下载文件对象连接
            self.transport = paramiko.Transport((self.hostname, self.port))
            self.transport.connect(username=self.username, password=self.password)
    
        def command(self, cmd):  # 正常执行命令的连接  至此对象内容就既有执行命令的连接又有上传下载链接
            ssh = paramiko.SSHClient()
            ssh._transport = self.transport
    
            stdin, stdout, stderr = ssh.exec_command(cmd)
            result = stdout.read()
            return result
    
        def upload(self, local_path, remote_path):
            sftp = paramiko.SFTPClient.from_transport(self.transport)
            sftp.put(local_path, remote_path)
            sftp.close()
    
        def close(self):
            self.transport.close()
    
        def __enter__(self):  # 对象执行with上下文会自动触发
            # 
            # print('触发了enter')
            self.open()
            return self  # 这里发挥上面with语法内的as后面拿到的就是什么
            # return 123
    
    
        def __exit__(self, exc_type, exc_val, exc_tb):  # with执行结束自动触发
            # print('触发了exit')
            self.close()
    """
    上面这个类在使用的时候 需要先执行open方法
    obj = SSHProxy()
    obj.open()  文件对象 链接服务器
    
    obj.command()
    obj.command()
    obj.upload()
    obj.upload()
    
    obj.close()  关闭链接
    
    
    文件操作
    f = open()
    
    f.write()
    f.read()
    
    f.close()
    
    with上下文管理
    with open() as f:
        ...
    """
    
    # 对象默认是不支持with语法的   
    # obj = SSHProxy('172.16.219.173',22,'root','jason123')
    # with obj as f:
    #     # print('进入with代码块')
    #     print(f)
    if __name__ == '__main__': 
        with SSHProxy('172.16.219.173',22,'root','jason123') as ssh:
            ssh.command()
            ssh.command()
            ssh.command()
            ssh.upload()
    

    面向对象面试题

    """
    面试题
    请在Context类中添加代码完成该类的实现
    class Context:
        pass
    
    with Context() as ctx:
        ctx.do_something()
    """
    class Context:
        def __enter__(self):
            return self
        
        def __exit__(self, exc_type, exc_val, exc_tb):
            pass
        
        def do_something(self):
            pass
    
    with Context() as ctx:
        ctx.do_something()
    

    python操作git

    安装

    pip3 install gitpython
    

    基本使用

    from git.repo import Repo
    import os
    
    
    # 从远程仓库下载代码到本地   pull/clone
    download_path = os.path.join('jason','NB')
    # 从远程仓库将代码下载到上面创建的目录中
    Repo.clone_from('https://github.com/DominicJi/TeachTest.git',to_path=download_path,branch='master')
    

    更多操作

    # ############## 2. pull最新代码 ##############
    import os
    from git.repo import Repo
     
    local_path = os.path.join('jason', 'NB')
    repo = Repo(local_path)
    repo.git.pull()
    
    # ############## 3. 获取所有分支 ##############
    import os
    from git.repo import Repo
     
    local_path = os.path.join('jason', 'NB')
    repo = Repo(local_path)
     
    branches = repo.remote().refs
    for item in branches:
        print(item.remote_head)
        
    # ############## 4. 获取所有版本 ##############
    import os
    from git.repo import Repo
     
    local_path = os.path.join('jason', 'NB')
    repo = Repo(local_path)
     
    for tag in repo.tags:
        print(tag.name)
    
    # ############## 5. 获取所有commit ##############
    import os
    from git.repo import Repo
     
    local_path = os.path.join('jason', 'NB')
    repo = Repo(local_path)
     
    # 将所有提交记录结果格式成json格式字符串 方便后续反序列化操作
    commit_log = repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}', max_count=50,
                              date='format:%Y-%m-%d %H:%M')
    log_list = commit_log.split("
    ")
    real_log_list = [eval(item) for item in log_list]
    print(real_log_list)
     
    # ############## 6. 切换分支 ##############
    import os
    from git.repo import Repo
     
    local_path = os.path.join('jason', 'NB')
    repo = Repo(local_path)
     
    before = repo.git.branch()
    print(before)
    repo.git.checkout('master')
    after = repo.git.branch()
    print(after)
    repo.git.reset('--hard', '854ead2e82dc73b634cbd5afcf1414f5b30e94a8')
     
    # ############## 7. 打包代码 ##############
    with open(os.path.join('jason', 'NB.tar'), 'wb') as fp:
        repo.archive(fp)
    

    封装之后的版本

    import os
    from git.repo import Repo
    from git.repo.fun import is_git_dir
    
    
    class GitRepository(object):
        """
        git仓库管理
        """
    
        def __init__(self, local_path, repo_url, branch='master'):
            self.local_path = local_path
            self.repo_url = repo_url
            self.repo = None
            self.initial(repo_url, branch)
    
        def initial(self, repo_url, branch):
            """
            初始化git仓库
            :param repo_url:
            :param branch:
            :return:
            """
            if not os.path.exists(self.local_path):
                os.makedirs(self.local_path)
    
            git_local_path = os.path.join(self.local_path, '.git')
            if not is_git_dir(git_local_path):
                self.repo = Repo.clone_from(repo_url, to_path=self.local_path, branch=branch)
            else:
                self.repo = Repo(self.local_path)
    
        def pull(self):
            """
            从线上拉最新代码
            :return:
            """
            self.repo.git.pull()
    
        def branches(self):
            """
            获取所有分支
            :return:
            """
            branches = self.repo.remote().refs
            return [item.remote_head for item in branches if item.remote_head not in ['HEAD', ]]
    
        def commits(self):
            """
            获取所有提交记录
            :return:
            """
            commit_log = self.repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',
                                           max_count=50,
                                           date='format:%Y-%m-%d %H:%M')
            log_list = commit_log.split("
    ")
            return [eval(item) for item in log_list]
    
        def tags(self):
            """
            获取所有tag
            :return:
            """
            return [tag.name for tag in self.repo.tags]
    
        def change_to_branch(self, branch):
            """
            切换分值
            :param branch:
            :return:
            """
            self.repo.git.checkout(branch)
    
        def change_to_commit(self, branch, commit):
            """
            切换commit
            :param branch:
            :param commit:
            :return:
            """
            self.change_to_branch(branch=branch)
            self.repo.git.reset('--hard', commit)
    
        def change_to_tag(self, tag):
            """
            切换tag
            :param tag:
            :return:
            """
            self.repo.git.checkout(tag)
        
       
    
    if __name__ == '__main__':
        local_path = os.path.join('codes', 'liren')
        repo = GitRepository(local_path,remote_path)
        branch_list = repo.branches()
        print(branch_list)
        repo.change_to_branch('dev')
        repo.pull()
    

    总结

    """
    后期你在接触一些模块的时候 也应该想到将该模块所有的方法整合到一起
    方便以后的调用
    """
    
  • 相关阅读:
    float实例讲解
    Eclipse导出可执行Java工程/可执行Jar文件(包含第三方Jar包)
    eclipse sql server 导出excel文件
    sql server和eclipse连接代码
    sql server和eclipse连接问题
    sql server下载教程
    初识eclipse-java
    java下载和环境变量配置
    sql sever登录问题
    Makefile文件(DE1-soc软件实验”hello_word")
  • 原文地址:https://www.cnblogs.com/zhangchaocoming/p/13211475.html
Copyright © 2011-2022 走看看