paramiko模块
通过ssh远程连接服务器并执行响应的命令,类似于Xshell
ansible用来批量管理远程服务器,底层其实用的就是paramiko模块
安装
pip3 install paramiko
使用
paramiko模块即支持用户名密码的方式操作服务器
也支持公钥私钥的方式操作服务器
并且实际生产中公钥私钥用的较多,因为密码是敏感信息
执行命令
"""执行命令 用户名和密码的方式"""
# 创建对象
ssh = paramiko.SSHClient()
# 允许链接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 链接服务器
ssh.connect(hostname='172.16.219.173',port=22,username='root',password='jason123')
# 执行命令
stdin, stdout, stderr = ssh.exec_command('ls /')
"""
stdin用来输入额外的命令
yum install ansible 额外的命令-y
stdout命令的返回结果 正确
stderr命令的返回结果 错误
"""
res = stdout.read() # 网络传输过来的二进制数据
print(res.decode('utf-8'))
# 关闭链接
ssh.close()
'''
不加下面的这句话报错:TypeError: 'NoneType' object is not callable
具体原因不详:在必应上查的结果是paramiko模块使用过程中产生的垃圾没被回收(垃圾回收)
'''
del ssh, stdin, stdout, stderr
"""
1.生成
ssh-keygen -t rsa
2.将你的公钥拷贝到远程服务器
ssh-copy-id -i ~/.ssh/id_rsa.pub 用户名@服务器地址
3.可以直接使用私钥路径 也可以单独拷贝出来生产一个文件
私钥路径:/Users/jason/.ssh/id_rsa
"""
# 公钥和私钥(先讲公钥保存到服务器上)
import paramiko
# 读取本地私钥
private_key = paramiko.RSAKey.from_private_key_file('a.txt')
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='172.16.219.173', port=22, username='root', pkey=private_key)
# 执行命令
stdin, stdout, stderr = ssh.exec_command('ls /')
# 获取命令结果
result = stdout.read()
print(result.decode('utf-8'))
# 关闭连接
ssh.close()
'''
不加下面的这句话报错:TypeError: 'NoneType' object is not callable
具体原因不详:在必应上查的结果是paramiko模块使用过程中产生的垃圾没被回收(垃圾回收)
'''
del ssh, stdin, stdout, stderr
上传下载文件
"""上传下载文件 用户名和密码的方式"""
import paramiko
# 用户名和密码
transport = paramiko.Transport(('172.16.219.173', 22))
transport.connect(username='root', password='jason123')
sftp = paramiko.SFTPClient.from_transport(transport)
# 上传文件
# sftp.put("a.txt", '/data/b.txt') # 注意上传文件到远程某个文件下 文件必须存在
# 下载文件
sftp.get('/data/b.txt', 'c.txt') # 将远程文件下载到本地并重新命令
transport.close()
"""上传下载文件 公钥私钥的方式"""
# 公钥和私钥
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('c.txt')
transport = paramiko.Transport(('172.16.219.173', 22))
transport.connect(username='root', pkey=private_key)
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
# sftp.put('manage.py', '/data/temp.py')
# 将remove_path 下载到本地 local_path
# sftp.get('remove_path', 'local_path')
transport.close()
类封装
"""
我现在即想执行命令又想上传下载文件并且多次执行
yum install ansible
yum install redis
yum install redis
upload
单链接下完成多部操作
"""
# 下面写的类 你只要只要是想通过paramiko链接服务器都可以使用
import paramiko
class SSHProxy(object):
def __init__(self, hostname, port, username, password):
self.hostname = hostname
self.port = port
self.username = username
self.password = password
self.transport = None
def open(self): # 给对象赋值一个上传下载文件对象连接
self.transport = paramiko.Transport((self.hostname, self.port))
self.transport.connect(username=self.username, password=self.password)
def command(self, cmd): # 正常执行命令的连接 至此对象内容就既有执行命令的连接又有上传下载链接
ssh = paramiko.SSHClient()
ssh._transport = self.transport
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
return result
def upload(self, local_path, remote_path):
sftp = paramiko.SFTPClient.from_transport(self.transport)
sftp.put(local_path, remote_path)
sftp.close()
def close(self):
self.transport.close()
def __enter__(self): # 对象执行with上下文会自动触发
#
# print('触发了enter')
self.open()
return self # 这里发挥上面with语法内的as后面拿到的就是什么
# return 123
def __exit__(self, exc_type, exc_val, exc_tb): # with执行结束自动触发
# print('触发了exit')
self.close()
"""
上面这个类在使用的时候 需要先执行open方法
obj = SSHProxy()
obj.open() 文件对象 链接服务器
obj.command()
obj.command()
obj.upload()
obj.upload()
obj.close() 关闭链接
文件操作
f = open()
f.write()
f.read()
f.close()
with上下文管理
with open() as f:
...
"""
# 对象默认是不支持with语法的
# obj = SSHProxy('172.16.219.173',22,'root','jason123')
# with obj as f:
# # print('进入with代码块')
# print(f)
if __name__ == '__main__':
with SSHProxy('172.16.219.173',22,'root','jason123') as ssh:
ssh.command()
ssh.command()
ssh.command()
ssh.upload()
面向对象面试题
"""
面试题
请在Context类中添加代码完成该类的实现
class Context:
pass
with Context() as ctx:
ctx.do_something()
"""
class Context:
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def do_something(self):
pass
with Context() as ctx:
ctx.do_something()
python操作git
安装
pip3 install gitpython
基本使用
from git.repo import Repo
import os
# 从远程仓库下载代码到本地 pull/clone
download_path = os.path.join('jason','NB')
# 从远程仓库将代码下载到上面创建的目录中
Repo.clone_from('https://github.com/DominicJi/TeachTest.git',to_path=download_path,branch='master')
更多操作
# ############## 2. pull最新代码 ##############
import os
from git.repo import Repo
local_path = os.path.join('jason', 'NB')
repo = Repo(local_path)
repo.git.pull()
# ############## 3. 获取所有分支 ##############
import os
from git.repo import Repo
local_path = os.path.join('jason', 'NB')
repo = Repo(local_path)
branches = repo.remote().refs
for item in branches:
print(item.remote_head)
# ############## 4. 获取所有版本 ##############
import os
from git.repo import Repo
local_path = os.path.join('jason', 'NB')
repo = Repo(local_path)
for tag in repo.tags:
print(tag.name)
# ############## 5. 获取所有commit ##############
import os
from git.repo import Repo
local_path = os.path.join('jason', 'NB')
repo = Repo(local_path)
# 将所有提交记录结果格式成json格式字符串 方便后续反序列化操作
commit_log = repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}', max_count=50,
date='format:%Y-%m-%d %H:%M')
log_list = commit_log.split("
")
real_log_list = [eval(item) for item in log_list]
print(real_log_list)
# ############## 6. 切换分支 ##############
import os
from git.repo import Repo
local_path = os.path.join('jason', 'NB')
repo = Repo(local_path)
before = repo.git.branch()
print(before)
repo.git.checkout('master')
after = repo.git.branch()
print(after)
repo.git.reset('--hard', '854ead2e82dc73b634cbd5afcf1414f5b30e94a8')
# ############## 7. 打包代码 ##############
with open(os.path.join('jason', 'NB.tar'), 'wb') as fp:
repo.archive(fp)
封装之后的版本
import os
from git.repo import Repo
from git.repo.fun import is_git_dir
class GitRepository(object):
"""
git仓库管理
"""
def __init__(self, local_path, repo_url, branch='master'):
self.local_path = local_path
self.repo_url = repo_url
self.repo = None
self.initial(repo_url, branch)
def initial(self, repo_url, branch):
"""
初始化git仓库
:param repo_url:
:param branch:
:return:
"""
if not os.path.exists(self.local_path):
os.makedirs(self.local_path)
git_local_path = os.path.join(self.local_path, '.git')
if not is_git_dir(git_local_path):
self.repo = Repo.clone_from(repo_url, to_path=self.local_path, branch=branch)
else:
self.repo = Repo(self.local_path)
def pull(self):
"""
从线上拉最新代码
:return:
"""
self.repo.git.pull()
def branches(self):
"""
获取所有分支
:return:
"""
branches = self.repo.remote().refs
return [item.remote_head for item in branches if item.remote_head not in ['HEAD', ]]
def commits(self):
"""
获取所有提交记录
:return:
"""
commit_log = self.repo.git.log('--pretty={"commit":"%h","author":"%an","summary":"%s","date":"%cd"}',
max_count=50,
date='format:%Y-%m-%d %H:%M')
log_list = commit_log.split("
")
return [eval(item) for item in log_list]
def tags(self):
"""
获取所有tag
:return:
"""
return [tag.name for tag in self.repo.tags]
def change_to_branch(self, branch):
"""
切换分值
:param branch:
:return:
"""
self.repo.git.checkout(branch)
def change_to_commit(self, branch, commit):
"""
切换commit
:param branch:
:param commit:
:return:
"""
self.change_to_branch(branch=branch)
self.repo.git.reset('--hard', commit)
def change_to_tag(self, tag):
"""
切换tag
:param tag:
:return:
"""
self.repo.git.checkout(tag)
if __name__ == '__main__':
local_path = os.path.join('codes', 'liren')
repo = GitRepository(local_path,remote_path)
branch_list = repo.branches()
print(branch_list)
repo.change_to_branch('dev')
repo.pull()
总结
"""
后期你在接触一些模块的时候 也应该想到将该模块所有的方法整合到一起
方便以后的调用
"""