zoukankan      html  css  js  c++  java
  • Python复习笔记(七)线程和进程

    1. 多任务

    • 并行:真的多任务
    • 并发:假的多任务

    2. 多任务-线程

    Python的 Thread模块是比较底层的模块,Python的 Threading模块 是对Thread做了一些包装,可以更加方便的被使用

    2.1 使用threading模块

    线程执行

    # -*- coding: utf-8 -*-
    """
    Created on Mon Mar  4 11:27:41 2019
    
    @author: Douzi
    """
    
    import threading
    import time
    
    def sing():
        for i in range(5):
            print("sing...." + str(i))
            time.sleep(1)
        
    def dance():
        for i in range(5):
            print("dance...." + str(i))
            time.sleep(1)
    
    def main():
        t1 = threading.Thread(target=sing)
        t2 = threading.Thread(target=dance)
        t1.start()
        t2.start()
    
    
    if __name__=="__main__":
        main()

    2.2 查看当前线程数量

    • 当调用Thread的时候,不会创建 线程

    • 调用 Thread创建出来的实例对象的 start方法地时候才会创建线程以及让这个线程开始运行

    # -*- coding: utf-8 -*-
    """
    Created on Mon Mar  4 18:48:16 2019
    
    @author: Douzi
    """
    
    import threading
    import time
    
    def sing():
        for i in range(5):
            print("......sing...%d..." % i)
            time.sleep(1)
        
    
    # 当调用Thread的时候,不会创建 线程
    # 当调用Thread创建出来的实例对象的 start方法地时候才会创建线程
    # 以及让这个线程开始运行
    def main():
        
        # 打印当前线程信息
        print(threading.enumerate())
        t1 = threading.Thread(target=sing)
    
        
        # 在调用Thread之后打印
        print(threading.enumerate())    
        t1.start()    # 子线程开始
        
        # 在调用start之后打印
        print(threading.enumerate())
        
    if __name__=="__main__":
        main()


    3. 线程-注意点

    3.1 线程执行代码的封装

    # -*- coding: utf-8 -*-
    """
    Created on Mon Mar  4 20:16:16 2019
    
    @author: Douzi
    """
    
    import threading
    import time
    
    class MyThread(threading.Thread):
        def run(self):
            for i in range(3):
                time.sleep(1)
                msg = "I'm "+self.name+' @ '+str(i)
                print(msg)
                self.login()
        
        def login(self):
            print("login.......")
    
        def register(self):
            print("register........")            
                
                
    if __name__=="__main__":
        t = MyThread()
        t.start()

     

    3.2 多线程共享全局变量

    • 在一个函数中, 对 全局变量进行修改 的时候,到底是否需要使用global进行说明,要看 是否对全局变量的执行指向进行了修改

    • 如果修改了执行,即让全局变量指向一个新的地方,那么必须使用global。

    • 如果,仅仅是修改了 指向的空间的数据,此时不必用 global

    # -*- coding: utf-8 -*-
    """
    Created on Mon Mar  4 21:01:53 2019
    
    @author: Douzi
    """
    
    import threading
    import time
    
    
    # 定义一个全局变量
    g_num = 100
    
    def test1():
        global g_num
        g_num += 1
        print("......in test1 g_num=%d=....." % g_num)
        
    def test2():
        print(".......in test2 g_num=%d=...." % g_num)
    
    def main():
        t1 = threading.Thread(target=test1)
        t2 = threading.Thread(target=test2)
        
        t1.start()
        time.sleep(1)    
        
        t2.start()
        time.sleep(1)    
        
        print("......in main Thread g_num = %d...." % g_num)
    
    
    if __name__=="__main__":
        main()

     

    3.3 多线程共享全局变量-args参数

    # -*- coding: utf-8 -*-
    """
    Created on Mon Mar  4 21:10:36 2019
    
    @author: Douzi
    """
    
    # -*- coding: utf-8 -*-
    """
    Created on Mon Mar  4 21:01:53 2019
    
    @author: Douzi
    """
    
    import threading
    import time
    
    
    # 定义一个全局变量
    g_num = 100
    
    def test1(temp):
        temp.append(33)
        print("......in test1 g_num=%s=....." % str(temp))
        
    def test2(temp):
        print(".......in test2 g_num=%s=...." % str(temp))
    
    g_nums = [11, 22]
    
    def main():
        # target指定将来 这个线程去哪个函数执行代码
        # args指定将来调用 函数的时候 传递什么数据过去
        t1 = threading.Thread(target=test1, args=(g_nums,))
        t2 = threading.Thread(target=test2, args=(g_nums,))
        
        t1.start()
        time.sleep(1)    
        
        t2.start()
        time.sleep(1)    
        
        print("......in main Thread g_nums = %s...." % str(g_nums))
    
    
    if __name__=="__main__":
        main()

    3.4 多线程-共享全局变量问题

    # -*- coding: utf-8 -*-
    """
    Created on Mon Mar  4 21:01:53 2019
    
    @author: Douzi
    """
    
    import threading
    import time
    
    
    # 定义一个全局变量
    g_num = 0
    
    
    def test1(num):
        global g_num
        for i in range(num):
            g_num += 1
        print("......in test1 g_num=%d=....." % g_num)
        
    def test2(num):
        global g_num
        for i in range(num):
            g_num += 1
        print(".......in test2 g_num=%d=...." % g_num)
    
    def main():
        t1 = threading.Thread(target=test1, args=(1000000,))
        t2 = threading.Thread(target=test2, args=(1000000,))
        
        t1.start()
        t2.start()
        
        # 等待上面2个线程执行完毕.....
        
        time.sleep(5)    
        
        print("......in main Thread g_num = %d...." % g_num)
    
    
    if __name__=="__main__":
        main()

    3.5 互斥锁

    当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制。

    某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”

    import threading
    
    # 创建锁
    mutex = threading.Lock()
    
    # 锁定
    mutex.acquice()
    
    # 释放
    mutex.release()

    注意:

    • 如果这个锁之前是没有上锁的,那么acquire不会阻塞
    • 如果在调用acquire对这个锁上锁之前,他已经被其他线程上了锁,那么此时acquire会阻塞,直到这个锁被解锁为止。

    3.6 使用互斥锁完成2个线程对同一个全局变量各加100w次的操作

    # -*- coding: utf-8 -*-
    """
    Created on Mon Mar  4 22:01:29 2019
    
    @author: Douzi
    """
    
    """使用互斥锁完成2个线程对同一个全局变量各加100w次的操作"""
    
    import threading
    import time
    
    g_num = 0
    
    def test1(num):
        global g_num
        for i in range(num):
            # 上锁        
            mutex.acquire()
            g_num += 1
            # 解锁
            mutex.release()
            
        print("----test1---g_num=%d" % g_num)
    
    def test2(num):
        global g_num
        for i in range(num):
            mutex.acquire()
            g_num += 1
            mutex.release()
            
        print("----test2---g_num=%d" % g_num)
        
    # 创建一个互斥锁,默认是没有上锁的
    mutex = threading.Lock()
    
    def main():
        t1 = threading.Thread(target=test1, args=(100,))
        t2 = threading.Thread(target=test2, args=(100,))
        
        t1.start()
        t2.start()
        
        # 等待上面的2个线程执行完毕....
        time.sleep(2)
        
        print("-----in main Thread g_num = %d" % g_num)    
        
        
    if __name__=="__main__":
        main()

    3.7 死锁、银行家算法


    4. 案例:多任务版udp聊天器

     

    说明

    • 编写一个2个线程的程序
    • 线程1用来接收数据然后显示
    • 线程2用来检测数据然后通过udp发送数据
    # -*- coding: utf-8 -*-
    """
    Created on Thu Mar  7 10:42:01 2019
    
    @author: Douzi
    """
    
    import socket
    import threading
    
    def recv_Message(udp_socket):
        """接收数据显示"""
        # 接收数据    
        while True:    
            recv_data = udp_socket.recvfrom(1024)
            print(recv_data)
    
    def send_Message(udp_socket, dest_ip, dest_port):
        # 发送数据
        while True:
            send_data = input("输入要发送的数据:")
            udp_socket.sendto(send_data.encode("utf-8"), (dest_ip, dest_port))
    
    def main():
        """完成udp聊天器的整体控制"""
        
        # 1. 创建套接字
        udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # 2. 绑定本地信息
        udp_socket.bind(("172.20.10.5", 7891))
        
        # 172.20.10.5, 
        # 3. 输入对方ip
        dest_ip = input("输入对方ip:")
        dest_port = input("输入对方port:")    
        
        # 4. 创建两个线程,去执行相应的功能
        t_recv = threading.Thread(target=recv_Message, args=(udp_socket,))
        t_send = threading.Thread(target=send_Message, args=(udp_socket,dest_ip, dest_port))
                    
        t_recv.start()
        t_send.start()
        
    
    if __name__=="__main__":
        main()

    5. 多任务-进程

    import multiprocessing 
    import time            
    
    def test1():
        while True:
            print("1.........")     
            time.sleep(1)
    
    def test2():
        while True:
            print("2.........")     
            time.sleep(1)
    
    
    def main():
        p1 = multiprocessing.Process(target=test1)
        p2 = multiprocessing.Process(target=test2)
        p1.start()
        p2.start()
    
    if __name__=="__main__":
        main()


    6. 进程和线程对比

    • 线程不能独立执行,必须依存在进程中

    优缺点:

    • 线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程相反

     


    7. 通过队列完成进程间通信

    from multiprocessing import Queue
    
    q = Queue(3)  # 初始化一个Queue对象
    q.put("消息1")
    q.put("消息2")
    print(q.full())  # False
    q.put("消息3")
    print(q.full())  # True
    
    # 因为消息队列已满,下面try会抛出异常,第一个try会等待2秒后抛出异常,第二个Try会立刻抛出异常
    try:
        q.put("消息4", True, 2)
    except:
        print("消息列队已满,现有消息数量:%s" % q.qsize())
        
    try:
        q.put_nowait("消息4")
    except:
        print("消息队列已满,现有:%s" % q.qsize())

    # -*- coding: utf-8 -*-
    """
    Created on Fri Mar  8 16:42:07 2019
    
    @author: Douzi
    """
    
    import multiprocessing
    
    
    def download_from_web(q):
        # 模拟从网上下载的数据
        data = [11, 22, 33, 44]
    
        for tmp in data:
            q.put(tmp)
    
        print("...下载器已经下载完了数据并且存入到队列中....")
    
    
    def analysis_data(q):
        # 数据处理
        # 从队列中获取数据
        waitting_analysis_data = list()
        while True:
            data = q.get()
            waitting_analysis_data.append(data)
            if q.empty():
                break
    
        # 模拟数据处理
        print(waitting_analysis_data)
    
    
    def main():
        # 1.创建一个队列
        q = multiprocessing.Queue()
    
        # 2.创建多个进程,将队列的引用当作实参进行传递到里面
        p1 = multiprocessing.Process(target=download_from_web, args=(q,))
        p2 = multiprocessing.Process(target=analysis_data, args=(q,))
        p1.start()
        p2.start()
    
    
    if __name__=="__main__":
        main()

    ...下载器已经下载完了数据并且存入到队列中....
    [11, 22, 33, 44]


    8. 进程池Pool

    当要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是上百个目标,手动创建进程的工作量巨大,此时可以用到Pool方法。

    初始化Pool时,可以指定一个最大的进程数,当有新的请求提交到Pool中时,如果池没有满,那么就会创建一个新的进程用来执行该请求;如果池达到指定的最大值,那么请求就会等待,直到池中有进程结束,池会用之前的进程执行新的任务,看下面实例:

    #!/user/bin/python3
    from multiprocessing import Pool
    import os, time, random
    
    def worker(msg):
        t_start = time.time()
        print("%s开始执行, 进程号为%d" % (msg, os.getpid()))
        # random.random()随机生成0~1之间的浮点数
        time.sleep(random.random()*2)
        t_stop = time.time()
        print(msg, "执行完毕,耗时%0.2f" % (t_stop-t_start))
    
    po = Pool(3)  # 定义一个进程池,最大进程数3
    for i in range(0, 10):
        # Pool().apply_async(要调用的目标, (传递给目标的参数元祖,))
        # 每次循环将会用空闲出来的子进程去调用目标
        po.apply_async(worker, (i,))
    
    print("----start-----")
    po.close()   # 关闭进程池,关闭后po不再接收新的请求
    po.join()    # 等待po中所有子进程执行完成,必须放在close语句之后
    print("-----end------")

    注意:如果 要保证进程池里的进程先结束,主进程再结束,需要加上 po.join()等待进程池中所有子进程执行完成(必须放在po.close()之后)

     ----start-----
    0开始执行, 进程号为26270
    1开始执行, 进程号为26271
    2开始执行, 进程号为26272
    0 执行完毕,耗时0.24
    3开始执行, 进程号为26270
    2 执行完毕,耗时0.65
    4开始执行, 进程号为26272
    3 执行完毕,耗时0.42
    5开始执行, 进程号为26270
    5 执行完毕,耗时0.43
    6开始执行, 进程号为26270
    1 执行完毕,耗时1.14
    7开始执行, 进程号为26271
    6 执行完毕,耗时0.37
    8开始执行, 进程号为26270
    8 执行完毕,耗时0.28
    9开始执行, 进程号为26270
    7 执行完毕,耗时0.82
    4 执行完毕,耗时1.35
    9 执行完毕,耗时1.59
    -----end------


    9. 文件夹copy器(多任务)

    import multiprocessing
    import os
    
    def copy_file(file_name, old_folder_name, new_folder_name):
        """完成文件的复制"""
        print("=======模拟copy文件: 从%s--->到%s 文件名是:%s" % (old_folder_name, new_folder_name, file_name))
        with open(old_folder_name + "/" + file_name, "rb") as old_f:
            content = old_f.read()
    
        with open(new_folder_name + "/" + file_name, "wb") as new_f:
            new_f.write(content)
    
    def main():
        # 1.获取用户要copy的文件
        old_folder_name = input("请输入要copy的文件夹名字:")
    
        # 2. 创建一个新的文件夹
        try:
            new_folder_name = old_folder_name+"[复件]"
            os.mkdir(new_folder_name)
        except:
            pass
        # 3. 获取文件夹的所有待copy的文件名字 listdir()
        file_names = os.listdir(old_folder_name)
        print(file_names)
        # 4. 创建进程池
        pools = multiprocessing.Pool(5)
    
        # 5. 向进程池中 添加copy文件的任务
        for file_name in file_names:
            pools.apply_async(copy_file, args=(file_name, old_folder_name, new_folder_name))
    
        # 复制源文件夹中的文件,到新文件夹重点文件去(进程池)
        pools.close()
        pools.join()
    
    
    if __name__ == '__main__':
        main()

    =======模拟copy文件: 从test--->到test[复件] 文件名是:uuid.py
    =======模拟copy文件: 从test--->到test[复件] 文件名是:smtpd.py
    =======模拟copy文件: 从test--->到test[复件] 文件名是:copyreg.py
    =======模拟copy文件: 从test--->到test[复件] 文件名是:random.py
    =======模拟copy文件: 从test--->到test[复件] 文件名是:decimal.py
    =======模拟copy文件: 从test--->到test[复件] 文件名是:tty.py
    =======模拟copy文件: 从test--->到test[复件] 文件名是:csv.py(略)

    10. 文件夹copy器v2_显示进度(进程间通信)

    from multiprocessing import Manager, Pool
    import os, time, random
    
    def copy_file(queue, file_name, old_folder_name, new_folder_name):
        """完成文件的复制"""
        # print("=======模拟copy文件: 从%s--->到%s 文件名是:%s" % (old_folder_name, new_folder_name, file_name))
        f_read = open(old_folder_name + "/" + file_name, "rb")
    
        f_write = open(new_folder_name + "/" + file_name, "wb")
        while True:
            time.sleep(0.001)
            content = f_read.read(1024)
            if content:
                f_write.write(content)
            else:
                break
        f_read.close()
        f_write.close()
    
        # 发送已经拷贝完毕的文件名字
        queue.put(file_name)
    
    def main():
        # 1.获取用户要copy的文件
        old_folder_name = input("请输入要copy的文件夹名字:")
    
        # 2. 创建一个新的文件夹
        try:
            new_folder_name = old_folder_name+"[复件]"
            os.mkdir(new_folder_name)
        except:
            pass
        # 3. 获取文件夹的所有待copy的文件名字 listdir()
        file_names = os.listdir(old_folder_name)
        # print(file_names)
    
        # 创建Queue
        queue = Manager().Queue()
    
    
        # 4. 创建进程池
        pools = Pool(5)
    
        # 5. 向进程池中 添加copy文件的任务
        for file_name in file_names:
            pools.apply_async(copy_file, args=(queue, file_name, old_folder_name, new_folder_name))
    
        # 主进程显示进度
        pools.close()
        # pools.join()
    
        all_file_num = len(file_names)
        while True:
            file_name = queue.get()       # 子进程放一个名字,这里取一个名字,取不到则会等待。实现进程间通信
            if file_name in file_names:
                file_names.remove(file_name)
    
            copy_rate = (all_file_num - len(file_names))*100 / all_file_num
            print("
    %.2f%%...(%s)" % (copy_rate, file_name) + " "*50, end="")
            if copy_rate >= 100:
                break
        print()
    
    
    if __name__ == '__main__':
        main()

    请输入要copy的文件夹名字:test
    100.00%...(shelve.py)  

  • 相关阅读:
    RabbitMQ笔记-基础知识
    什么是HashMap【转】
    Mysql笔记-查询缓存
    Redis笔记-配置文件
    Autofac-.net core控制台使用依赖注入【转】
    布隆过滤器(c#简单实现)【转】
    Redis笔记-布隆过滤器组件
    StackExchange.Redis笔记-分布式锁
    StackExchange.Redis笔记-发布订阅
    StackExchange.Redis笔记-性能调优【转】
  • 原文地址:https://www.cnblogs.com/douzujun/p/10468603.html
Copyright © 2011-2022 走看看