1.Python代码操作git
-
安装
pip3 install gitpython
-
操作git
import os from git.repo import Repo # gitpython def clone(): download_path = os.path.join('codes', 'fuck') # git clone -b master https://gitee.com/wupeiqi/xxoo.git # git clone -b v1 https://gitee.com/wupeiqi/xxoo.git Repo.clone_from('https://gitee.com/wupeiqi/xxoo.git', to_path=download_path, branch='master') def pull(): # git pull origin master local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) repo.git.pull() def tag_list(): local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) tag_list = [item.name for item in repo.tags] print(tag_list) def commits_list(): import json local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) commit_log = repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',date='format:%Y-%m-%d %H:%M',max_count=50) commit_list = [json.loads(item) for item in commit_log.split(' ') ] print(commit_list) def branches_list(): pull() local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) branches = repo.remote().refs branch_list = [item.remote_head for item in branches if item.remote_head != "HEAD"] def checkout_to_branch(branch='dev'): local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) # before = repo.git.branch() # print('当前所在分支:',before) repo.git.checkout(branch) # after = repo.git.branch() # print('当前所在分支:',after) def checkout_to_commit(commit='ec1d728'): # commits_list() local_path = os.path.join('codes', 'fuck') repo = Repo(local_path) repo.git.reset('--hard', commit)
-
封装到一个类中,以后当做工具。
import os import json from git.repo import Repo from git.repo.fun import is_git_dir class GitRepository(object): """ git仓库管理 """ def __init__(self, local_path, repo_url, branch='master'): #本地存储代码的路径 E:Django_issuecode_issuecodesxxx self.local_path = local_path #远程仓库地址 https://gite.com/zhanyu/xxx.git self.repo_url = repo_url # 这个默认为空下边自动赋值 self.repo = None # 调用下边的方法 self.initial(branch) def initial(self,branch): """ 初始化git仓库 :param repo_url: :param branch: :return: """ # 判断本地的仓库中有没有文件,没有就创建 if not os.path.exists(self.local_path): os.makedirs(self.local_path) # codes/luffycity/.git git_local_path = os.path.join(self.local_path, '.git') # 用来判断是不是有这个文件 if not is_git_dir(git_local_path): # 第一次拉文件的时候是需要克隆的,不能直接拉取 self.repo = Repo.clone_from(self.repo_url, to_path=self.local_path, branch=branch) else: # 如果有这个.git文件就实例化Repo这个类; self.repo = Repo(self.local_path) def pull(self): """ 从线上拉最新代码 :return: """ self.repo.git.pull() def branches(self): """ 获取所有分支 :return: """ branches = self.repo.remote().refs return [item.remote_head for item in branches if item.remote_head not in ['HEAD', ]] def commits(self): """ 获取所有提交记录 :return: """ commit_log = self.repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}', max_count=50, date='format:%Y-%m-%d %H:%M') return [json.loads(item) for item in commit_log.split(' ') ] def tags(self): """ 获取所有tag :return: """ return [tag.name for tag in self.repo.tags] def change_to_branch(self, branch): """ 切换分值 :param branch: :return: """ self.repo.git.checkout(branch) def change_to_commit(self, branch, commit): """ 切换commit :param branch: :param commit: :return: """ self.change_to_branch(branch=branch) self.repo.git.reset('--hard', commit) def change_to_tag(self, tag): """ 切换tag :param tag: :return: """ self.repo.git.checkout(tag) if __name__ == '__main__': local_path = os.path.join('codes', 'luffycity') repo_object = GitRepository(local_path, 'https://gitee.com/wupeiqi/xxoo.git')
2.解压缩文件
import shutil
# 压缩文件: py2、py3
"""
abs_file_path = shutil.make_archive(
base_name="files/ww", # 压缩包文件路劲
format='tar', # “zip”, “tar”
root_dir='codes/luffycity' # 被压缩的文件目录
)
print(abs_file_path)
"""
# 解压缩:py3
# shutil._unpack_zipfile('files/ww.zip', 'xxxxxx/new')
# shutil._unpack_tarfile('files/ww.zip', 'xxxxxx/new')
# 解压缩:py2/py3
"""
import zipfile
z = zipfile.ZipFile('files/ww.zip', 'r')
z.extractall(path='xxxxxx/luffy')
z.close()
import tarfile
tar = tarfile.TarFile('code/www.tar', 'r')
tar.extractall(path='/code/x1/') # 可设置解压地址
tar.close()
"""
3.基于paramiko操作远程服务器
import paramiko
class SSHProxy(object):
def __init__(self, hostname, port, username, private_key_path):
self.hostname = hostname
self.port = port
self.username = username
self.private_key_path = private_key_path
self.transport = None
def open(self):
private_key = paramiko.RSAKey.from_private_key_file(self.private_key_path)
self.transport = paramiko.Transport((self.hostname, self.port))
self.transport.connect(username=self.username, pkey=private_key)
def close(self):
self.transport.close()
def command(self, cmd):
ssh = paramiko.SSHClient()
ssh._transport = self.transport
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
ssh.close()
return result
def upload(self, local_path, remote_path):
sftp = paramiko.SFTPClient.from_transport(self.transport)
sftp.put(local_path, remote_path)
sftp.close()
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if __name__ == '__main__':
with SSHProxy('10.211.55.25', 22, 'root', '/Users/wupeiqi/.ssh/id_rsa') as proxy:
proxy.upload('xx','xx')
proxy.command('ifconfig')
proxy.command('ifconfig')
proxy.upload('xx', 'xx')
with SSHProxy('10.211.55.26', 22, 'root', '/Users/wupeiqi/.ssh/id_rsa') as proxy:
proxy.upload('xx','xx')
proxy.command('ifconfig')
proxy.command('ifconfig')
proxy.upload('xx', 'xx')
4.本地执行命令
import subprocess
result = subprocess.check_output('dir', cwd='D:wupeiqicodeziwencodes', shell=True)
print(result.decode('gbk'), type(result))