zoukankan      html  css  js  c++  java
  • Python用法速查@进程-线程-协程

    进程

    进程是正在运行程序的实例,一个程序可以有多个进程。

    pid=os.fork()

    作用:拷贝一份父进程(代码段,堆栈段,数据段)的数据来创建一个子进程。
    返回:父进程返回子进程进程号,子进程返回0。
    特点:子进程父进程相互独立,不在共享任何数据。

    #以下代码在Linux下执行,window下没有fork函数
    import os
    
    pid = os.fork()
    
    if pid < 0:
        print 'Fail to create process'
    elif pid == 0:
        print 'I am child process (%s) and my parent is (%s).' % (os.getpid(), os.getppid()) #os.getpid()获得当前(子)进程号,os.getppid()获得父进程号
    else:
        print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
    
    #结果
    I (86645) just created a child process (86646).
    I am child process (86646) and my parent is (86645).
    
    

    下验证了子进程从pid=os.fork()之后开始执行

    
    [root@VM-0-15-centos ~]# cat tt.py
    import os
    
    print('start fork...')
    pid = os.fork()
    print('fork finish..')
    
    
    if pid < 0:
        print('create False.')
    elif pid == 0:
        print('I am child_proc [%d]' % (os.getpid()))
    else:
        print('I am father_proc [%d]' % (os.getpid()))
    
    
    
    
    [root@VM-0-15-centos ~]#
    [root@VM-0-15-centos ~]# python3 tt.py
    start fork...
    fork finish..
    I am father_proc [25188]
    fork finish..
    I am child_proc [25189]
    [root@VM-0-15-centos ~]#
    

    多进程

    p=Process(target=child_proc, args=(x,))

    import os
    from multiprocessing import Process
    
    # 子进程要执行的代码
    def child_proc(name):
        print 'Run child process %s (%s)...' % (name, os.getpid())
    
    if __name__ == '__main__':
        print 'Parent process %s.' % os.getpid()
        p = Process(target=child_proc, args=('test',))
        print 'Process will start.'
        p.start()
        p.join() #阻塞所有其他进程(包括父进程)
        print 'Process end.'
    #结果
    Parent process 7170.
    Process will start.
    Run child process test (10075)...
    Process end.
    
    

    from multiprocessing import Process'

    multiprocessing与平台有关,同一段代码:

    import random
    import os
    from multiprocessing import Process
    
    num = random.randint(0, 100)
    
    def show_num():
        print("pid:{}, num is {}".format(os.getpid(), num))
    
    if __name__ == "__main__":
        print("pid:{}, num is {}".format(os.getpid(), num))
        p = Process(target=show_num) #创建新进程
        p.start()
        p.join()
    
    

    windows

    pid:6504, num is 25
    pid:6880, num is 6
    
    

    Linux

    pid:11747, num is 13
    pid:11748, num is 13
    
    

    我们发现window下num结果竟然不一样???
    个人理解:
    windows没有fork(),就不能像Linux那样让子进程从被创建的哪段代码之后开始执行。

    spawn/fork/forkserver

    这是因为根据系统平台的不同multiprocessing提供了3种启动进程的方式

    spawn

    父进程会启动一个新的解释器,子进程只会继承run()所需的资源。 不必要的文件描述符和句柄(一种指针)不会被继承。 该方法和fork,forkserver相比,启动进程较慢。

    可在Unix和Windows上使用。 Windows上的默认设置。

    fork

    父进程使用 os.fork() 来产生 Python 解释器分叉。 子进程在开始时实际上与父进程相同,并且会继承父进程的所有资源。 多线程的安全是有问题的。

    只能在Unix上使用。Unix上的默认设置。

    forkserver

    程序启动并选择forkserver启动方法时,将启动一个服务器进程。 之后每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。 分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。 没有不必要的资源被继承。

    可在Unix平台上使用,并支持通过Unix管道传递文件描述符。

    进程池

    可以使用进程池来创建多个进程

    p=Pool(n)

    p.apply_async(chile_func, args=(a,))

    import os, time
    from multiprocessing import Pool
    
    def foo(x):
        print 'Run task %s (pid:%s)...' % (x, os.getpid())
        time.sleep(2)
        print 'Task %s result is: %s' % (x, x * x)
    
    if __name__ == '__main__':
        print 'Parent process %s.' % os.getpid()
        p = Pool(4)         # 设置进程数
        for i in range(5):
            p.apply_async(foo, args=(i,))    # 设置每个进程要执行的函数和参数
        print 'Waiting for all subprocesses done...'
        p.close() #关闭进程池
        p.join()  #等待所有子进程执行完毕
        print 'All subprocesses done.'
    
    #结果
    Parent process 7170.
    Run task 1 (pid:10320)...
    Run task 0 (pid:10319)...
    Run task 3 (pid:10322)...
    Run task 2 (pid:10321)...
    Waiting for all subprocesses done...
    Task 1 result is: 1
    Task 0 result is: 0
    Run task 4 (pid:10320)...
    Task 3 result is: 9
    Task 2 result is: 4
    Task 4 result is: 16
    All subprocesses done.
    
    

    进程通信

    q=Queue()

    p1=Processing(func_1, args=(q,))/q.put(message)

    p2=Processing(func_2, args=(q,))/q.get(message)

    # -*- coding: utf-8 -*-
    
    from multiprocessing import Process, Queue
    
    # 向队列中写入数据
    def write_task(q):
        try:
            n = 1
            while n < 5:
                print "write, %d" % n
                q.put(n)
                time.sleep(1)
                n += 1
        except BaseException:
            print "write_task error"
        finally:
            print "write_task end"
    
    # 从队列读取数据
    def read_task(q):
        try:
            n = 1
            while n < 5:
                print "read, %d" % q.get()
                time.sleep(1)
                n += 1
        except BaseException:
            print "read_task error"
        finally:
            print "read_task end"
    
    if __name__ == "__main__":
        q = Queue()  # 父进程创建Queue,并传给各个子进程
    
        pw = Process(target=write_task, args=(q,))
        pr = Process(target=read_task, args=(q,))
    
        pw.start()   # 启动子进程 pw,写入
        pr.start()   # 启动子进程 pr,读取
        pw.join()    # 等待 pw 结束
        pr.join()    # 等待 pr 结束
        print "DONE"
    	
    	
    	
    #结果
    write, 1
    read, 1
    write, 2
    read, 2
    write, 3
    read, 3
    write, 4
    read, 4
    write_task end
    read_task end
    DONE
    
    

    线程

    线程是进程的一个实例,一个进程至少有一个线程。
    区分
    进程(CPU分配资源的基本单位)之间相互独立,不共享数据,一个崩溃不影响其他进程。
    多个线程(CPU调度分派的基本单位)共享同一个进程的数据,一个崩溃全部崩溃。

    t=Thread(child_thre, args=(a, ), name='xxx')

    from threading import Thread, current_thread
    
    def hello(message):
        print('%s, I am %s' % (message, current_thread().name))
        print('bye~')
    
    if __name__ == '__main__':
        message='HELLO'
        print('%s, I AM %s' % (message, current_thread().name))
        t = Thread(target=hello, args=('hello',), name='child_thread')
        t.start()
        t.join()
        print('BYE~')
    	
    	
    #结果
    D:pythonpython.exe E:/PYTHON/Basics/Fun/HelloYoutube.py
    HELLO, I AM MainThread
    hello, I am child_thread
    bye~
    BYE~
    
    Process finished with exit code 0
    
    

    l = Lock()

    l.acquire()

    l.release()

    线程共享进程的数据,就会遇到线程同步的问题

    from threading import Thread, current_thread
    
    count = 0
    
    
    def calc():
        global count
        print('Hello, this is %s. ' % current_thread().name)
        for _ in range(1000000):  #每个线程对共享数据count做一百万次加一操作
            count += 1
    
        print('thread %s is finish.' % current_thread().name)
    
    
    if __name__ == '__main__':
        print('HELLO, THIS IS %s' % current_thread().name)
        t_list = []
    
        for i in range(5):
            t_list.append(Thread(target=calc)) #创建5个线程
            t_list[i].start()
    
        for i in t_list:
            i.join()
    
        print('count = %s' % count)
        print('THREAD %s FINISH.' % current_thread().name)
    
    #结果
    D:pythonpython.exe E:/PYTHON/Basics/Fun/HelloYoutube.py
    HELLO, THIS IS MainThread
    Hello, this is Thread-1. 
    Hello, this is Thread-2. 
    Hello, this is Thread-3. 
    thread Thread-1 is finish.
    thread Thread-2 is finish.
    Hello, this is Thread-4.
    thread Thread-3 is finish.
    Hello, this is Thread-5. 
    thread Thread-4 is finish.
    thread Thread-5 is finish.
    count = 3567886
    THREAD MainThread FINISH.
    
    Process finished with exit code 0
    
    

    结果为什么不是5000000?
    count在被一个线程读走还未完成加一操作,又被另一个线程读走,导致读脏数据。
    我还发现一个有趣的现象:
    在def calc()中打印每次count做加一操作后的值,当循环次数比较小如十万时。数字不是递增1有的会差几十,但最终count的值却是正确的(五十万)

    我们可用锁解决这个问题

    import time
    from threading import Thread, current_thread, Lock
    start = time.time()
    
    count = 0
    lock = Lock()
    
    def calc():
        global count
        print('Hello, this is %s. ' % current_thread().name)
        for _ in range(1000000):
            lock.acquire() #获得锁
            count += 1
            lock.release() #释放锁
    
        print('thread %s is finish.' % current_thread().name)
    
    
    if __name__ == '__main__':
        print('HELLO, THIS IS %s' % current_thread().name)
        t_list = []
    
        for i in range(5):
            t_list.append(Thread(target=calc))
            t_list[i].start()
    
        for i in t_list:
            i.join()
    
        print('count = %s' % count)
        print('THREAD %s FINISH.' % current_thread().name)
    
    end = time.time()
    print('Running time:%d' % (end-start))
    
    # 结果
    D:pythonpython.exe E:/PYTHON/Basics/Fun/HelloYoutube.py
    HELLO, THIS IS MainThread
    Hello, this is Thread-1. 
    Hello, this is Thread-2. 
    Hello, this is Thread-3. 
    Hello, this is Thread-4. 
    Hello, this is Thread-5. 
    thread Thread-1 is finish.
    thread Thread-2 is finish.
    thread Thread-3 is finish.
    thread Thread-4 is finish.thread Thread-5 is finish.
    
    count = 5000000
    THREAD MainThread FINISH.
    Running time:13 #明显感觉到执行时间变长了,13秒!!!,不加锁是0秒(取整)。
    
    Process finished with exit code 0
    
    

    GIL info

    讲到 Python 中的多线程,就不得不面对 GIL 锁,GIL 锁的存在导致 Python 不能有效地使用多线程实现多核任务,因为在同一时间,只能有一个线程在运行。

    GIL 全称是 Global Interpreter Lock,译为全局解释锁。早期的 Python 为了支持多线程,引入了 GIL 锁,用于解决多线程之间数据共享和同步的问题。但这种实现方式后来被发现是非常低效的,当大家试图去除 GIL 的时候,却发现大量库代码已重度依赖 GIL,由于各种各样的历史原因,GIL 锁就一直保留到现在。

    global_data = local()

    我们知道多个线程共享一个数据,如何让线程拥有自己独有的数据?,有两种不完美的方法:
    1.在函数中定义局部变量,但不便于修改。

    from threading import Thread, current_thread
    
    def echo(num):
        print current_thread().name, num
    
    def calc():
        print 'thread %s is running...' % current_thread().name
        local_num = 0
        for _ in xrange(10000):
            local_num += 1
        echo(local_num)
        print 'thread %s ended.' % current_thread().name
    
    if __name__ == '__main__':
        print 'thread %s is running...' % current_thread().name
    
        threads = []
        for i in range(5):
            threads.append(Thread(target=calc))
            threads[i].start()
        for i in range(5):
            threads[i].join()
    
        print 'thread %s ended.' % current_thread().name
    # 结果 很显然确实互不影响
    thread MainThread is running...
    thread Thread-4 is running...
    Thread-4 10000
    thread Thread-4 ended.
    thread Thread-5 is running...
    Thread-5 10000
    thread Thread-5 ended.
    thread Thread-6 is running...
    Thread-6 10000
    thread Thread-6 ended.
    thread Thread-7 is running...
    Thread-7 10000
    thread Thread-7 ended.
    thread Thread-8 is running...
    Thread-8 10000
    thread Thread-8 ended.
    thread MainThread end
    

    2.把线程独有数据存在同一个dict{key=current_thread,value=data}中,却又使得每个线程都可访问该数据,非常不安全。

    from threading import Thread, current_thread
    
    global_dict = {}
    
    def echo():
        num = global_dict[current_thread()]    # 线程根据自己的 ID 获取数据
        print current_thread().name, num
    
    def calc():
        print 'thread %s is running...' % current_thread().name
        
        global_dict[current_thread()] = 0
        for _ in xrange(10000):
            global_dict[current_thread()] += 1
        echo()
        
        print 'thread %s ended.' % current_thread().name
    
    if __name__ == '__main__':
        print 'thread %s is running...' % current_thread().name
    
        threads = []
        for i in range(5):
            threads.append(Thread(target=calc))
            threads[i].start()
        for i in range(5):
            threads[i].join()
    
        print 'thread %s ended.' % current_thread().name
    
    # 结果
    thread MainThread is running...
    thread Thread-64 is running...
    thread Thread-65 is running...
    thread Thread-66 is running...
    thread Thread-67 is running...
    thread Thread-68 is running...
    Thread-67 10000
    thread Thread-67 ended.
    Thread-65 10000
    thread Thread-65 ended.
    Thread-68 10000
    thread Thread-68 ended.
    Thread-66 10000
    thread Thread-66 ended.
    Thread-64 10000
    thread Thread-64 ended.
    thread MainThread ended.
    
    
    

    local()应运而生!!!

    global_data.num = 0

    local()提供了一种线程绑定自己独有数据的方式。

    from threading import Thread, current_thread, local
    
    global_data = local()
    
    def echo():
        num = global_data.num
        print current_thread().name, num
    
    def calc():
        print 'thread %s is running...' % current_thread().name
        
        global_data.num = 0 #对于不同的线程global_data.num的值是不同的
        for _ in xrange(10000):
            global_data.num += 1
        echo()
        
        print 'thread %s ended.' % current_thread().name
    
    if __name__ == '__main__':
        print 'thread %s is running...' % current_thread().name
    
        threads = []
        for i in range(5):
            threads.append(Thread(target=calc))
            threads[i].start()
        for i in range(5):
            threads[i].join()
    
        print 'thread %s ended.' % current_thread().name
    
    # 结果
    
    thread MainThread is running...
    thread Thread-94 is running...
    thread Thread-95 is running...
    thread Thread-96 is running...
    thread Thread-97 is running...
    thread Thread-98 is running...
    Thread-96 10000
    thread Thread-96 ended.
    Thread-97 10000
    thread Thread-97 ended.
    Thread-95 10000
    thread Thread-95 ended.
    Thread-98 10000
    thread Thread-98 ended.
    Thread-94 10000
    thread Thread-94 ended.
    thread MainThread ended.
    
    
    

    协程

    一次函数调用,只有一个入口,一次返回,调用顺序明确。
    1.协程有多个入口对函数进行中断、继续执行
    2.相对于多线程,它只在一个线程中执行,避免了线程间的切换开销、访问冲突。
    可通过创建协程将异步编程同步化
    技术支持
    python2 yield
    python3.5 async/await
    第三方库gevent

    n = yield data

    Python2 可以通过 yield 来实现基本的协程,但不够强大,第三方库 gevent 对协程提供了强大的支持。另外,Python3.5 提供了 async/await 语法来实现对协程的支持。本文只讨论通过 yield 来实现协程。

    #生产者消费者问题
    
    import time
    
    def consumer(): #带有yild的函数被叫做生成器generator
        message = ''
        while True:
            n = yield message     # yield 使函数中断
            if not n:
                return
            print '[CONSUMER] Consuming %s...' % n
            time.sleep(2)
            message = '200 OK'
    
    def produce(c):
        c.next()           # 启动生成器 ~~我理解为生成器被中断后next下一个开始执行的位置~~
    	               # python3 用 c.__next__()
        n = 0
        while n < 5:
            n = n + 1
            print '[PRODUCER] Producing %s...' % n
            r = c.send(n)  # 通过 send 切换到 consumer 执行
            print '[PRODUCER] Consumer return: %s' % r
        c.close()
    
    if __name__ == '__main__':
        c = consumer()
        produce(c)
    
    # 结果
    [PRODUCER] Producing 1...
    [CONSUMER] Consuming 1...
    [PRODUCER] Consumer return: 200 OK
    [PRODUCER] Producing 2...
    [CONSUMER] Consuming 2...
    [PRODUCER] Consumer return: 200 OK
    [PRODUCER] Producing 3...
    [CONSUMER] Consuming 3...
    [PRODUCER] Consumer return: 200 OK
    [PRODUCER] Producing 4...
    [CONSUMER] Consuming 4...
    [PRODUCER] Consumer return: 200 OK
    [PRODUCER] Producing 5...
    [CONSUMER] Consuming 5...
    [PRODUCER] Consumer return: 200 OK
    
    
    
    #声音太小的女演员
    
    def actress():
        message = '[旁白]:actress is 表演中...'
        while True:
            n = yield message  # 中断
            print(n)
            if not n:
                return
            message = '[actress]:Why stout me again ?'
    
    
    def diractor(a):
        a.__next__()  # 中断后下一步
        message = '[diractor]:You should Shout louder !!!'
        for i in range(5):
            response = a.send(message)
            print(response)
    
        a.close()
    
    
    if __name__ == '__main__':
        a = actress()
        diractor(a)
    
    #结果
    D:pythonpython.exe E:/PYTHON/Basics/Fun/HelloYoutube.py
    [diractor]:You should Shout louder !!!
    [actress]:Why stop me again ?
    [diractor]:You should Shout louder !!!
    [actress]:Why stop me again ?
    [diractor]:You should Shout louder !!!
    [actress]:Why stop me again ?
    [diractor]:You should Shout louder !!!
    [actress]:Why stop me again ?
    [diractor]:You should Shout louder !!!
    [actress]:Why stop me again ?
    
    Process finished with exit code 0
    

    怎么有了tcp的感觉(以下是我的胡扯)?

    客户端启动 终端等待服务端回复=> n = yield message
    客户端唤醒服务端 启动=> a.__next__()

    服务端 发=> send(message)
    服务端 收=> response = send(message)

    客户端 发=> yield message
    客户端 收=> n = yield message

    参考

    explore-python/Process-Thread-coroutine


    ________________________________________________________

    Every good deed you do will someday come back to you.

    Love you,love word !
  • 相关阅读:
    C# 反射
    jquery 循环绑定click的问题
    socket 编程
    EF code first出现错误:列名 Discriminator 无效
    C# 两个类是否继承关系
    C# MD5,hmacSHA1
    文件分块上传
    读写CSV到DataTable
    ajax 提交数组 泛型集合(嵌套集合)
    Json中Date映射到model
  • 原文地址:https://www.cnblogs.com/hugboy/p/Process-Thread-Coroutine.html
Copyright © 2011-2022 走看看