zoukankan      html  css  js  c++  java
  • Python操作git

    一、Git版本管理

    很多公司在使用git的tag进行版本的管理。

    git tag -n                          查看本地Tag
    git tag -l 'v1.4.2.*'               查看本地Tag,模糊匹配
    git show v1.0                       查看
     
    git tag -a v1.0 -m '版本介绍'        本地创建Tag
    git tag -d v1.0                     删除Tag
    git checkout v.10                   切换tag
     
    git push origin  --tags             推送本地tag到远程
    git pull origin  --tags             获取远程tag
     
    git clone -b v0.10  http://...      指定版本克隆代码

    二、Python操作git

    """
    基于Python实现对git仓库进行操作,使用前需要安装模块:gitpython
        pip3 install gitpython
    """
     
    # ############## 1. clone下载代码 ##############
    """
    import os
    from git.repo import Repo
     
    download_path = os.path.join('code', 'fuck')
    Repo.clone_from('https://gitee.com/wupeiqi/fuck.git', to_path=download_path, branch='master')
    """
     
    # ############## 2. pull最新代码 ##############
    """
    import os
    from git.repo import Repo
     
    local_path = os.path.join('code', 'fuck')
    repo = Repo(local_path)
    repo.git.pull()
    """
    # ############## 3. 获取所有分支 ##############
    """
    import os
    from git.repo import Repo
     
    local_path = os.path.join('code', 'fuck')
    repo = Repo(local_path)
     
    branches = repo.remote().refs
    for item in branches:
        print(item.remote_head)
    """
    # ############## 4. 获取所有版本 ##############
    """
    import os
    from git.repo import Repo
     
    local_path = os.path.join('code', 'fuck')
    repo = Repo(local_path)
     
    for tag in repo.tags:
        print(tag.name)
    """
     
    # ############## 5. 获取所有commit ##############
    """
    import os
    from git.repo import Repo
     
    local_path = os.path.join('code', 'fuck')
    repo = Repo(local_path)
     
    commit_log = repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}', max_count=50,
                              date='format:%Y-%m-%d %H:%M')
    log_list = commit_log.split("
    ")
    real_log_list = [eval(item) for item in log_list]
    print(real_log_list)
    """
     
    # ############## 6. 切换分支 ##############
    """
    import os
    from git.repo import Repo
     
    local_path = os.path.join('code', 'fuck')
    repo = Repo(local_path)
     
    before = repo.git.branch()
    print(before)
    repo.git.checkout('master')
    after = repo.git.branch()
    print(after)
    repo.git.reset('--hard', '854ead2e82dc73b634cbd5afcf1414f5b30e94a8')
    """
     
    # ############## 7. 打包代码 ##############
    """
    with open(os.path.join('code', 'fuck.tar'), 'wb') as fp:
        repo.archive(fp)
    """
    View Code
    import os
    from git.repo import Repo
    from git.repo.fun import is_git_dir
    
    
    class GitRepository(object):
        """
        git仓库管理
        """
    
        def __init__(self, local_path, repo_url, branch='master'):
            self.local_path = local_path
            self.repo_url = repo_url
            self.repo = None
            self.initial(repo_url, branch)
    
        def initial(self, repo_url, branch):
            """
            初始化git仓库
            :param repo_url:
            :param branch:
            :return:
            """
            if not os.path.exists(self.local_path):
                os.makedirs(self.local_path)
    
            git_local_path = os.path.join(self.local_path, '.git')
            if not is_git_dir(git_local_path):
                self.repo = Repo.clone_from(repo_url, to_path=self.local_path, branch=branch)
            else:
                self.repo = Repo(self.local_path)
    
        def pull(self):
            """
            从线上拉最新代码
            :return:
            """
            self.repo.git.pull()
    
        def branches(self):
            """
            获取所有分支
            :return:
            """
            branches = self.repo.remote().refs
            return [item.remote_head for item in branches if item.remote_head not in ['HEAD', ]]
    
    
        def commits(self):
            """
            获取所有提交记录
            :return:
            """
            commit_log = self.repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',
                                           max_count=50,
                                           date='format:%Y-%m-%d %H:%M')
            log_list = commit_log.split("
    ")
            return [eval(item) for item in log_list]
    
        def tags(self):
            """
            获取所有tag
            :return:
            """
            return [tag.name for tag in self.repo.tags]
    
        def change_to_branch(self, branch):
            """
            切换分值
            :param branch:
            :return:
            """
            self.repo.git.checkout(branch)
    
        def change_to_commit(self, branch, commit):
            """
            切换commit
            :param branch:
            :param commit:
            :return:
            """
            self.change_to_branch(branch=branch)
            self.repo.git.reset('--hard', commit)
    
        def change_to_tag(self, tag):
            """
            切换tag
            :param tag:
            :return:
            """
            self.repo.git.checkout(tag)
    
    
    if __name__ == '__main__':
        local_path = os.path.join('codes', 'luffycity')
        repo = GitRepository(local_path, 'https://gitee.com/wupeiqi/fuck.git')
        branch_list = repo.branches()
        print(branch_list)
        repo.change_to_branch('dev')
        repo.pull()
    git相关操作类

    三、Python解压缩文件

    在py2和py3中对文件进行解压缩稍有不同。

    • shutil 模块【压缩支持py2和py3,解压只支持py3】
    • tarfile / zipfile模块【支持py2和py3】 
    import shutil
    
    # 文件压缩
    """
    ret = shutil.make_archive(
        base_name="code/www",  # 压缩包文件路劲
        format='zip',  # “zip”, “tar”
        root_dir='code/fuck'  # 被压缩的文件件
    )
    print(ret)
    """
    
    # 解压文件
    """
    shutil._unpack_zipfile('code/www.zip', 'code/new')
    shutil._unpack_tarfile('code/www.tar', 'code/new')
    """
    shutil模块
    import zipfile
    
    # 压缩
    z = zipfile.ZipFile('laxi.zip', 'w')
    z.write('a.log')
    z.write('data.data')
    z.close()
    
    # 解压
    z = zipfile.ZipFile('laxi.zip', 'r')
    z.extractall()
    z.close()
    zipfile模块
    import tarfile
    
    # 压缩
    tar = tarfile.open('your.tar', 'w')
    tar.add('utils/codes/luffycity/a1.py')
    tar.add('utils/codes/luffycity/a3.py')
    tar.close()
    
    # 解压
    tar = tarfile.TarFile('code/www.tar', 'r')
    tar.extractall(path='/code/x1/')  # 可设置解压地址
    tar.close()
    tarfile模块

    四、执行本地命令

    import subprocess
     
    result = subprocess.check_output('ls -l', cwd='/Users/wupeiqi/PycharmProjects', shell=True)
    print(result.decode('utf-8'), type(result))

    五、Paramiko执行远程操作

    import paramiko
     
     
    class SSHProxy(object):
     
        def __init__(self, hostname, port, username, private_key_path):
            self.hostname = hostname
            self.port = port
            self.username = username
            self.private_key_path = private_key_path
     
            self.transport = None
     
        def open(self):
            private_key = paramiko.RSAKey.from_private_key_file(self.private_key_path)
            self.transport = paramiko.Transport((self.hostname, self.port))
            self.transport.connect(username=self.username, pkey=private_key)
     
        def close(self):
            self.transport.close()
     
        def command(self, cmd):
            ssh = paramiko.SSHClient()
            ssh._transport = self.transport
            stdin, stdout, stderr = ssh.exec_command(cmd)
            result = stdout.read()
            #ssh.close()
            return result
     
        def upload(self, local_path, remote_path):
            sftp = paramiko.SFTPClient.from_transport(self.transport)
            sftp.put(local_path, remote_path)
            sftp.close()
     
        def __enter__(self):
            self.open()
            return self
     
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.close()
     
     
    if __name__ == '__main__':
        with SSHProxy('10.211.55.25', 22, 'root', '/Users/wupeiqi/.ssh/id_rsa') as ssh:
            # v1 = ssh.command('sudo ifconfig')
            # print(v1)
            ssh.upload('your.tar', '/data/your.tar')  
    View Code

    六、杀进程

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import os
    import signal
    import subprocess
     
    output = subprocess.check_output("pgrep -f python", shell=True)
    pid_list = map(int, output.split())
    for pid in pid_list:
        os.kill(pid, signal.SIGKILL)
    View Code

    七、其他

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    """
    SaltAPI推送文件
    """
    
    # #### 基于SSH:API ####
    """
    from salt.client.ssh.client import SSHClient
    client = SSHClient()
    
    # 执行命令
    # result = client.cmd('*', 'cmd.run', ('ls',))
    
    # 调用grains
    # ret = client.cmd('*','grains.items')
    
    # 调用pillar
    # ret = client.cmd('*','pillar.items')
    
    # 执行 state
    # ret = client.cmd('*','state.sls',('fengfeng','pillar={"xxxx":"luffy"}'))
    
    # 发送文件
    # ret = client.cmd('*','cp.get_file',('salt://fengfeng/files/test.conf','/data/s1.conf'))
    
    # 发送文件
    # ret = client.cmd('*','cp.get_url',('http://www.pythonav.com/allstatic/imgs/mv/picture/2.jpeg','/data/s1.jpeg'))
    """
    # #### 基于Master:API ####
    """
    import salt.client
    local = salt.client.LocalClient()
    
    # 执行命令
    # result = client.cmd('*', 'cmd.run', ('ls',))
    
    # 调用grains
    # ret = client.cmd('*','grains.items')
    
    # 调用pillar
    # ret = client.cmd('*','pillar.items')
    
    # 执行 state
    # ret = client.cmd('*','state.sls',('fengfeng','pillar={"xxxx":"luffy"}'))
    
    # 发送文件
    # ret = client.cmd('*','cp.get_file',('salt://fengfeng/files/test.conf','/data/s1.conf'))
    
    # 发送文件
    # ret = client.cmd('*','cp.get_url',('http://www.pythonav.com/allstatic/imgs/mv/picture/2.jpeg','/data/s1.jpeg'))
    """
    salt操作
    ########  1. 执行命令
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import subprocess
    import commands
    
    result = subprocess.check_output('ls', cwd='/Users/wupeiqi/PycharmProjects', shell=True)
    print(result, type(result))
    
    ret = commands.getoutput("pgrep -f python")
    print(ret)
    
    ########  2. 解压缩文件
    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import shutil
    
    # 文件压缩
    """
    ret = shutil.make_archive(
        base_name="/Users/wupeiqi/PycharmProjects/deploy27/前戏/wwwwwwwwww",
        format='gztar', # “zip”, “tar”, “bztar”,“gztar”
        root_dir='/Users/wupeiqi/PycharmProjects/deploy27/deploy'
    )
    print(ret)
    """
    
    # 文件解压
    import tarfile
    import zipfile
    
    # shutil._unpack_zipfile(file.stream, upload_path)
    """
    tar = tarfile.open('/Users/wupeiqi/PycharmProjects/deploy27/前戏/wwwwwwwwww.tar.gz', 'r')
    tar.extractall(path='/Users/wupeiqi/PycharmProjects/deploy27/前戏/dp/')  # 可设置解压地址
    tar.close()
    """
    
    ########  3. 遍历文件夹下的所有文件
    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    import os
    
    for item in os.listdir('/Users/wupeiqi/PycharmProjects/deploy27/deploy'):
        print(item)
    
    for item in os.walk('/Users/wupeiqi/PycharmProjects/deploy27/deploy'):
        print(item)
    
    ########  4. 重命名和删除
    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    import shutil
    
    # shutil.move('/Users/wupeiqi/PycharmProjects/deploy27/deploy1','/Users/wupeiqi/PycharmProjects/deploy27/deploy')
    
    # shutil.rmtree('/Users/wupeiqi/PycharmProjects/deploy27/t')
    
    
    ########  5. 杀进程
    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    import os
    import signal
    import subprocess
    import commands
    
    output = subprocess.check_output("pgrep -f python", shell=True)
    pid_list = map(int, output.split())
    for pid in pid_list:
        os.kill(pid, signal.SIGKILL)
    
    ########  6. salt推送文件
    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    """
    SaltAPI推送文件
    """
    
    # #### 基于SSH:API ####
    """
    from salt.client.ssh.client import SSHClient
    client = SSHClient()
    
    # 执行命令
    # result = client.cmd('*', 'cmd.run', ('ls',))
    
    # 调用grains
    # ret = client.cmd('*','grains.items')
    
    # 调用pillar
    # ret = client.cmd('*','pillar.items')
    
    # 执行 state
    # ret = client.cmd('*','state.sls',('fengfeng','pillar={"xxxx":"luffy"}'))
    
    # 发送文件
    # ret = client.cmd('*','cp.get_file',('salt://fengfeng/files/test.conf','/data/s1.conf'))
    
    # 发送文件
    # ret = client.cmd('*','cp.get_url',('http://www.pythonav.com/allstatic/imgs/mv/picture/2.jpeg','/data/s1.jpeg'))
    """
    # #### 基于Master:API ####
    """
    import salt.client
    local = salt.client.LocalClient()
    
    # 执行命令
    # result = client.cmd('*', 'cmd.run', ('ls',))
    
    # 调用grains
    # ret = client.cmd('*','grains.items')
    
    # 调用pillar
    # ret = client.cmd('*','pillar.items')
    
    # 执行 state
    # ret = client.cmd('*','state.sls',('fengfeng','pillar={"xxxx":"luffy"}'))
    
    # 发送文件
    # ret = client.cmd('*','cp.get_file',('salt://fengfeng/files/test.conf','/data/s1.conf'))
    
    # 发送文件
    # ret = client.cmd('*','cp.get_url',('http://www.pythonav.com/allstatic/imgs/mv/picture/2.jpeg','/data/s1.jpeg'))
    """
    发布功能
  • 相关阅读:
    Java 蓝桥杯 算法训练 貌似化学
    Java 蓝桥杯 算法训练 貌似化学
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    JAVA-蓝桥杯-算法训练-字符串变换
    Ceph:一个开源的 Linux PB 级分布式文件系统
    shell 脚本监控程序是否正在执行, 如果没有执行, 则自动启动该进程
  • 原文地址:https://www.cnblogs.com/bubu99/p/13111006.html
Copyright © 2011-2022 走看看