zoukankan      html  css  js  c++  java
  • day09 threading, paramiko, queue 模块

    1 模拟ssh

    2 锁 内部锁,程序锁,信号量

    3 多线程

    4  简单消息队列

     

    先来看模拟ssh  ,python 的强大之处就是因为有很多模块,可以很简单的完成复杂的事情,今天我们用paramiko 模块来模拟一个ssh 的交互

    ssh:  只可远程执行linux 服务(或者是有ssh 服务的系统)

    1 先简单执行命令测试下

    #!/usr/bin/env python3
    # Author: Shen Yang
    import paramiko
    #help(paramiko)
    #实例化一个客户端
    ssh = paramiko.SSHClient()
    #允许未登陆过的登陆
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    #连接远程主机
    ssh.connect(hostname='192.168.81.133',port=22,username='root',password='7758521')
    #执行命令,收集三种结果,标准输入,标准输出,标准错误
    stdin,stdout,stderr = ssh.exec_command('uptime')
    #获取结果,转码
    result = stdout.read().decode()
    result_err = stderr.read().decode()
    #打印
    print(result)
    if result_err:
        print('error!',result_err)
    

     

    是不是很激动! 确实,这样感觉太好了,有点类似于slat 的感觉了!

    解释一下上面的:

    stdin,stdout,stderr = ssh.exec_command('uptime')

    我们看到三个变量,

    stdin 是获取输入的值,这里并没有。

    stdout 是获取标准输出,就像我们在shell 下执行命令获取到的正确结果,为什么说是正确结果呢?因为

    stderr 是获取当命令执行错误后的结果的。

    这里并不支持交互性的命令比如top 什么的,如果非用top 记得加上 -bn 1

    scp: 可以从linux上传和下载  远端服务器必须有ssh 服务
    import paramiko
    #建立一个通道
    transport = paramiko.Transport(('192.168.81.133',22))
    transport.connect(username='root',password='7758521')
    #从这个通道开始传输文件
    sftp = paramiko.SFTPClient.from_transport(transport)
    # 将本地文件scp_client.py传输至远程服务器/tmp/yuanduan.py
    sftp.put('scp_client.py','/tmp/yuanduan.py')
    #将远端文件/tmp/yuanduan.py下载至本地/tmp/yuanduan.py
    sftp.get('/tmp/yuanduan.py','/tmp/yuanduan.py')
    

    结果:

    上面的是使用密码方式,那么我们是否可以使用密钥方式来连接远端服务器呢?答案是肯定的

    使用密钥连接ssh
    #!/usr/bin/env python3
    # Author: Shen Yang
    import paramiko
    #指定Key 文件路径
    key_file = paramiko.RSAKey.from_private_key_file('mykey')
    host = '192.168.1.201'
    
    #创建客户端对象
    ssh = paramiko.SSHClient()
    #允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    #连接服务器
    ssh.connect(hostname=host,port=22,username='root', key_filename='mykey')
    
    #执行命令
    stdin,stdout,stderr = ssh.exec_command('uptime')
    
    #获取命令结果
    result = stdout.read()
    print(result.decode())
    #关闭连接
    ssh.close()
    

      

    好了,关于模拟ssh 的paramiko 模我们先用到这里,以后再用到其他功能再细研究吧。。

     下面开始讲线程和进程,比较重要!

    先将线程,什么是线程呢(thread)?

    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

     画个图来表示一下:

    每一个程序的内存是独立的

    每一个程序一个进程:

    例如:  qq 要以一个整体的形式暴露给操作系统管理,里面包含对各种资源的调用,内存的管理,网络接口的调用。

    对各种资源管理的集合就可以称为 进程。

    进程要操作cpu  必须要先创建一个线程。
    进程是资源的集合,不能执行,要执行,必须先生成一个线程。

    线程: 是操作系统最小的调度单位,是一串指令的集合。

    一个进程创建一个线程 然后通过这个线程创建多个线程,子线程还可以创建更多线程。
     
    他们的关系是平等的  线程之间是独立的 没有隶属关系,新创建的线程不会因为创建他的线程关闭而关闭。
     
     
    进程和线程没有可比性,不是一个东西。
    但是:
    启动一个线程比启动一个进程快!

     

    对于一个主线程的修改可能会影响同一进程下的其他线程。

     创建子进程相当于对父进程的克隆

     

    多个子进程之间的数据不同,不是共享的。
    修改进程1 的数据不会改变进程2的数据。。  

    让我们实践的操作一下,下面是一个简单的事例,证明一下这是一个并发的操作:

    #!/usr/bin/env python3
    # Author: Shen Yang
    import threading,time
    def run(n):
        print('this is ',n)
        time.sleep(2)
    #实例化两个任务,   target 是要执行的任务, args 是传入的参数,是一个元组,即便是一个参数也要使用,分割
    t1 = threading.Thread(target=run,args=('t1',))
    t2 = threading.Thread(target=run,args=('t2',))
    #运行
    t1.start()
    t2.start()
    
    结果:
    sleep 2秒是同时sleep的

    第二种使用方式,不常用
    通过类调用使用:
     
    1 先写一个类,继承Thread
    2 执行新类即可
    #定义一个类,继承Thread
    class MyThread(threading.Thread):
        def __init__(self,n):
            super(MyThread, self).__init__()
            self.n = n
        def run(self):
            print('running task ',self.n)
            time.sleep(2)
    
    #实例化两个任务
    t1 = MyThread('t1')
    t2 = MyThread('t2')
    
    #启动
    t1.start()
    t2.start()
    

    结果同上

    那么我们是否可以计算所有线程的结束时间吗,可以的:

    #!/usr/bin/env python3
    # Author: Shen Yang
    import threading,time
    
    #定义任务过程
    def run(n):
        print('this is ',n)
        time.sleep(2)
    
    #记住开始时间
    start_time = time.time()
    
    #设置空对象列表
    obj_list = []
    #生成每个对象执行并把对象加入列表中
    for i in range(50):
        t = threading.Thread(target=run,args=(i,))
        t.start()
        obj_list.append(t)
    #循环列表里的对象来等待所有对象执行完毕
    for t in obj_list:
        t.join()
    #等所有对象执行完毕后执行计算时间
    print('cost:',time.time() - start_time)
    

     

    打印主线程:

     

    打印子线程:

     

    打印活动的线程个数:

     守护进程:

    守护进程是仆人,主进程是主人,主人退出,守护进程也要退出。

     socket  server  可以设置守护线程,这样在手动退出的时候不会等待其他线程结束就退出。

     实践:

    #!/usr/bin/env python3
    # Author: Shen Yang
    import threading,time
    
    end_list = []
    
    def run(n):
        print('this is ',n)
        time.sleep(1)
        end_list.append(n)
    
    start_time = time.time()
    
    #设置空对象列表
    obj_list = []
    #生成每个对象执行并把对象加入列表中
    for i in range(5000):
        t = threading.Thread(target=run,args=(i,))
        t.setDaemon(True)
        t.start()
        obj_list.append(t)
    
    #等所有对象执行完毕后执行计算时间
    print('cost:',time.time() - start_time)
    time.sleep(0.8)
    
    #print(end_list)
    print(len(end_list))
    

    可以看到,执行了一半就终止了,其他的被强制退出了

    接下来,讲一下lock

    无论你有多少核,python 同时只能运行一个线程,一个缺陷
     
     
    同一时间,只有一个线程可以在任意cpu核心执行,是gil 控制了这一点
     
    pypy  jit  可以提前把代码预编译为机器码,速度更快
     
    jpython 也没有这个锁,因为是在jre环境运行的
     
     
    锁的存在会出现一个问题:
    解释器每100条指令(不是pyhton 命令,而是更底层的指令)切换一次锁
    所以当某个线程在执行大于100条数据的时候还没等执行完就处于等待状态了,就是一个变量还没修改就被第二个线程掉取,导致数据出错
     
    解决办法:
    在用户层再加一层锁,保证同一时间修改同一个数据

     

    如果有多层锁,就会有几率用错钥匙解锁,导致进入不能出来锁,
    所以需要用到Rlock()

    线程锁(互斥锁Mutex)

    一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

     

    其实信号量就是一个特殊的锁,可以允许多个线程的锁

    说白了就是和锁相反:同一时间最多可以有几个线程修改数据

    可以控制同时又多少个线程在运行
     
    涉及到连接池,和线程数量
    可以控制mysql 的连接数, 和socketserver的同一时间连接的client数量
    #!/usr/bin/env python3
    # Author: Shen Yang
    import threading,time
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print('run the thread:{_n}
    '.format(_n=n))
        semaphore.release()
    if __name__ == '__main__':
        semaphore = threading.BoundedSemaphore(5)
        for i in range(22):
            t = threading.Thread(target=run,args=(i,))
            t.start()
    while threading.active_count() != 1:
        pass
    else:
        print('-----all threads done-------')
    

      

    看下效果:

    事件:

    Events

    用于线程之间的数据同步,

    event  就是在设置一个全局变量,只不过是再封装了一层
    另一个线程不断检查这个变量,来根据这个变量进行相应的操作

     一个红灯停绿灯行的例子:

    实践一下:

    #!/usr/bin/env python3
    # Author: Shen Yang
    import threading,time
    
    #实例化一个event
    event = threading.Event()
    
    #定义红绿灯
    def lighter():
        count = 0
        event.set() #设置标志位
        while True:
            if count >5 and count <10:  #判断更改标志位
                event.clear()   #清理
                print('33[41;1mred light is on ...33[0m')
            elif count >10:
                event.set() #设置
                count = 0 #归0
            else:    #其他为设置状态
                print('33[46;1mgreen light is on ...33[0m')
            time.sleep(1)
            count += 1
    
    #定义汽车
    def car(name):
        while True:
            if event.is_set(): #有设置
                print('{_name} running ...'.format(_name = name))
                time.sleep(1)
            else:
                print('{_name} see the red light ,waiting ...'.format(_name = name))
                event.wait()
                print('33[46;1m{_name} see green light is on,start going ...33[0m'.format(_name = name))
    
    #启动灯
    light = threading.Thread(target=lighter,)
    light.start()
    
    #启动汽车
    car1 = threading.Thread(target=car,args=('Tesla',))
    car2 = threading.Thread(target=car,args=('Fit',))
    car3 = threading.Thread(target=car,args=('Civic',))
    car1.start()
    car2.start()
    car3.start()
    

    看下效果:

     

     

    作用:
    1 解耦
    2 提高工作效率
     
     
    和列表相比,就是数据自动消失
     
     
    数据存在内存中
     
    队列有几种方式,如下
    1 先进先出:

     

    使用 get_nowait() 通过判断异常知道队列为空

     

    也可以实现:

     

     可以等待:

     

     可以设置队列长度:

     

    卡住了
     
    可以设置阻塞状态和等待时间 然后抛异常,判断

    后入先出:

     

    设置队列优先级:
    可用于VIP 队列:

     

    优先级越低越优先

     

     

    生产者:

     

    消费者:

     

    执行:

     

  • 相关阅读:
    《ASP.NET Core跨平台开发从入门到实战》Web API自定义格式化protobuf
    .NET Core中文分词组件jieba.NET Core
    .NET Core 2.0及.NET Standard 2.0
    Visual Studio 2017 通过SSH 调试Linux 上.NET Core
    Visual Studio 2017 ASP.NET Core开发
    Visual Studio 2017正式版离线安装及介绍
    在.NET Core 上运行的 WordPress
    IT人员如何开好站立会议
    puppeteer(二)操作实例——新Web自动化工具更轻巧更简单
    puppeteer(一)环境搭建——新Web自动化工具(同selenium)
  • 原文地址:https://www.cnblogs.com/yangever/p/5898903.html
Copyright © 2011-2022 走看看