zoukankan      html  css  js  c++  java
  • 线程、进程、携程

    进程: qq 要以一个整体的形式暴露给操作系统管理,里面包含对各种资源的调用,内存的管理,网络接口的调用等。。。对各种资源管理的集合 就可以成为 进程

    线程: 是操作系统最小的调度单位, 是一串指令的集合(在python中同一时间执行的线程只有一个python多线程 不适合cpu密集操作型的任务,适合io操作密集型的任务

    进程 要操作cpu , 必须要先创建一个线程 ,所有在同一个进程里的线程是共享同一块内存空间的

    进程与线程的区别?
    1.线程共享内存空间,进程的内存是独立的

    2.同一个进程的线程之间可以直接交流,两个进程想通信,必须通过一个中间代理来实现

    3.创建新线程很简单, 创建新进程需要对其父进程进行一次克隆

    4.一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程

    启动线程:(主线程不等子线程)程序在退出之前默认等待所有线程执行完毕

    import threading
    import time
    
    def run(n):
        print("task ",n )
        time.sleep(2)
        print("task done",n)
    
    start_time = time.time()
    for i in range(50):
        t = threading.Thread(target=run,args=("t-%s" %i ,))
        t.start()
    
    print("----------all threads has finished...")
    print("cost:",time.time() - start_time)

    启动线程:(主线程等待子线程执行完毕)

    import threading
    import time
    
    def run(n):
        print("task ",n )
        time.sleep(2)
        print("task done",n)
    
    start_time = time.time()
    t_objs = [] #存线程实例
    for i in range(50):
        t = threading.Thread(target=run,args=("t-%s" %i ,))
        t.start()
        t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
    
    for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
        t.join()
    
    print("----------all threads has finished...")
    print("cost:",time.time() - start_time)

    守护进程(即守护线程):以上主线程等待子线程执行完毕的过程中,虽然主线程没有等待子线程,但是在程序结束的时候默认等待所有的子线程结束。而守护进程在程序结束的时候是不需要等待守护进程执行结束的

    import threading
    import time
    
    def run(n):
        print("task ",n )
        time.sleep(2)
        print("task done",n,threading.current_thread())
    
    start_time = time.time()
    for i in range(50):
        t = threading.Thread(target=run,args=("t-%s" %i ,))
        t.setDaemon(True) #把当前线程设置为守护线程
        t.start()
    
    #print("----------all threads has finished...",threading.current_thread(),threading.active_count())
    print("cost:",time.time() - start_time)

    线程锁(也叫互斥锁)的应用:(防止多个线程对同一份资源的处理发生冲突, 上完锁之后程序变成了串行)python3程序自己上锁了

    import threading
    import time
    
    def run(n):
        lock.acquire()
        global  num
        num +=1
        time.sleep(1)
        lock.release()
    
    lock = threading.Lock()
    num = 0
    t_objs = [] #存线程实例
    for i in range(50):
        t = threading.Thread(target=run,args=("t-%s" %i ,))
        t.start()
        t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
    
    for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
        t.join()print("num:",num)

    递归锁的应用:(如果出现锁中有锁就需要递归锁,如果不用递归锁而把锁的顺序混淆就会出现卡死现象)

    import threading
    
    def run1():
        print("grab the first part data")
        lock.acquire()
        global num
        num += 1
        lock.release()
        return num
    
    def run2():
        print("grab the second part data")
        lock.acquire()
        global num2
        num2 += 1
        lock.release()
        return num2
    
    def run3():
        lock.acquire()
        res = run1()
        print('--------between run1 and run2-----')
        res2 = run2()
        lock.release()
        print(res, res2)
    
    num, num2 = 0, 0
    lock = threading.RLock()
    for i in range(1):
        t = threading.Thread(target=run3)
        t.start()
    
    while threading.active_count() != 1:
        print(threading.active_count())
    else:
        print('----all threads done---')
        print(num, num2)

    信号量:(互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据)

    import threading, time
    
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print("run the thread: %s
    " % n)
        semaphore.release()
    
    if __name__ == '__main__':
        semaphore = threading.BoundedSemaphore(5)  # 最多允许5个线程同时运行
        for i in range(22):
            t = threading.Thread(target=run, args=(i,))
            t.start()
    while threading.active_count() != 1:
        pass  # print threading.active_count()
    else:
        print('----all threads done---')

    Event:(来实现两个或多个线程间的交互)

    import time
    import threading
    
    event = threading.Event()
    
    def lighter():
        count = 0
        event.set() #先设置绿灯
        while True:
            if count >5 and count < 10: #改成红灯
                event.clear() #把标志位清了
                print("33[41;1mred light is on....33[0m")
            elif count >10:
                event.set() #变绿灯
                count = 0
            else:
                print("33[42;1mgreen light is on....33[0m")
            time.sleep(1)
            count +=1
    
    def car(name):
        while True:
            if event.is_set(): #代表绿灯
                print("[%s] running..."% name )
                time.sleep(1)
            else:
                print("[%s] sees red light , waiting...." %name)
                event.wait()
                print("33[34;1m[%s] green light is on, start going...33[0m" %name)
    
    light = threading.Thread(target=lighter,)
    light.start()
    
    car1 = threading.Thread(target=car,args=("Tesla",))
    car1.start()

    queue队列:(作用是解耦和提高效率

    import threading,time
    import queue
    
    q = queue.Queue(maxsize=10)
    
    def Producer(name):
        count = 1
        while True:
            q.put("骨头%s" % count)
            print("生产了骨头",count)
            count +=1
            time.sleep(0.1)
    
    def  Consumer(name):
        #while q.qsize()>0:
        while True:
            print("[%s] 取到[%s] 并且吃了它..." %(name, q.get()))
            time.sleep(1)
    
    p = threading.Thread(target=Producer,args=("Alex",))
    c = threading.Thread(target=Consumer,args=("ChengRonghua",))
    c1 = threading.Thread(target=Consumer,args=("王森",))
    
    p.start()
    c.start()
    c1.start()

    多进程:用法上和多线程一样。每个子进程都是由父进程启动的。

    import multiprocessing
    import time
    
    def run(name):
        time.sleep(2)
        print("hello", name)
    
    if __name__ == "__main__":
        p = multiprocessing.Process(target=run, args=('bob', ))
        p.start()
        p.join()

    父进程启动子进程:

    from multiprocessing import Process
    import os
    
    def info(title):
        print(title)
        print('module name:', __name__)
        print('parent process:', os.getppid())
        print('process id:', os.getpid())
        print("
    
    ")
    
    def f(name):
        info('33[31;1mcalled from child process function f33[0m')
        print('hello', name)
    
    if __name__ == '__main__':
        info('33[32;1mmain process line33[0m')
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()

    进程间通讯(Queue):(以下这个例子中父进程的队列取到了子进程中队列的数,这其实是子进程从父进程中克隆了一份队列,然后这两个进程中的队列通过pickle进行互通)这只是实现了数据的传输

    from multiprocessing import Process, Queue
    
    def f(qq):
        qq.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())

    进程间通讯(Pipe):实现了数据的传递。

    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([42, None, 'hello from child'])
        conn.send([42, None, 'hello from child2'])
        print("from parent:",conn.recv())
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())  # prints "[42, None, 'hello']"
        print(parent_conn.recv())  # prints "[42, None, 'hello']"
        parent_conn.send("张洋可好") # prints "[42, None, 'hello']"
        p.join()

    进程间通讯(Manager):实现了进程间数据的共享和传输。

    from multiprocessing import Process, Manager
    import os
    def f(d, l):
        d[os.getpid()] =os.getpid()
        l.append(os.getpid())
        print(l)
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict() #生成一个字典,可在多个进程间共享和传递
    
            l = manager.list(range(5))#生成一个列表,可在多个进程间共享和传递
            p_list = []
            for i in range(10):
                p = Process(target=f, args=(d, l))
                p.start()
                p_list.append(p)
            for res in p_list: #等待结果
                res.join()
    
            print(d)
            print(l)

     进程锁:虽然进程间的数据是相互独立的,但是他们在打印的时候需要占用同一块屏幕,所以需要锁来保证打印不会混乱。

    from multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        print('hello world', i)
        l.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for num in range(100):
            Process(target=f, args=(lock, num)).start()

    进程池:同一时间由多少进程在运行。以下的回调函数是主进程调用的

    from  multiprocessing import Process, Pool
    import time
    import os
    
    def Foo(i):
        time.sleep(2)
        print("in process",os.getpid())
        return i + 100
    
    def Bar(arg):
        print('-->exec done:', arg,os.getpid())
    
    if __name__ == '__main__':
        pool = Pool(processes=3) #允许进程池同时放入5个进程
        print("主进程",os.getpid())
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回调
            #pool.apply(func=Foo, args=(i,)) #串行
            #pool.apply_async(func=Foo, args=(i,)) #并行
        print('end')
        pool.close()
        pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。.join()

     协程:协程是一种用户态的轻量级线程。(cpu都不知道它的存在)就是在单线程中,适合高并发处理。但不能用于多核

    yield实现简单的携程:

    import timedef consumer(name):
        print("--->starting eating baozi...")
        while True:
            new_baozi = yield
            print("[%s] is eating baozi %s" % (name, new_baozi))
    
    def producer():
        r = con.__next__()
        r = con2.__next__()
        n = 0
        while n < 5:
            n += 1
            con.send(n)
            con2.send(n)
            time.sleep(1)
            print("33[32;1m[producer]33[0m is making baozi %s" % n)
    
    if __name__ == '__main__':
        con = consumer("c1")
        con2 = consumer("c2")
        p = producer()

    greenlet携程:对携程进行了封装,需要手动进行切换

    from greenlet import greenlet
    def test1():
        print(12)
        gr2.switch()
        print(34)
        gr2.switch()
    def test2():
        print(56)
        gr1.switch()
        print(78)
    
    gr1 = greenlet(test1) #启动一个携程
    gr2 = greenlet(test2)
    gr1.switch()

    Gevent:自动进行切换

    import gevent
    
    def foo():
        print('Running in foo')
        gevent.sleep(2)
        print('Explicit context switch to foo again')
    def bar():
        print('Explicit精确的 context内容 to bar')
        gevent.sleep(1)
        print('Implicit context switch back to bar')
    def func3():
        print("running func3 ")
        gevent.sleep(0)
        print("running func3  again ")
    
    gevent.joinall([
        gevent.spawn(foo), #生成一个携程
        gevent.spawn(bar),
        gevent.spawn(func3),
    ])

    用携程爬取内容:

    from urllib import request
    import gevent,time
    from gevent import monkey
    monkey.patch_all() #把当前程序的所有的io操作给我单独的做上标记
    
    def f(url):
        print('GET: %s' % url)
        resp = request.urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    urls = ['https://www.python.org/',
            'https://www.yahoo.com/',
            'https://github.com/' ]
    time_start = time.time()
    for url in urls:
        f(url)
    print("同步cost",time.time() - time_start)
    async_time_start = time.time()
    gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
    ])
    print("异步cost",time.time() - async_time_start)

    用携程实现socket:

    服务器端:

    import sys
    import socket
    import time
    import gevent
    
    from gevent import socket, monkey
    
    monkey.patch_all()
    
    def server(port):
        s = socket.socket()
        s.bind(('0.0.0.0', port))
        s.listen(500)
        while True:
            cli, addr = s.accept()
            gevent.spawn(handle_request, cli)
    
    def handle_request(conn):
        try:
            while True:
                data = conn.recv(1024)
                print("recv:", data)
                conn.send(data)
                if not data:
                    conn.shutdown(socket.SHUT_WR)
    
        except Exception as  ex:
            print(ex)
        finally:
            conn.close()
    
    if __name__ == '__main__':
        server(8001)

    客户端:

    import socket
    
    HOST = 'localhost'  # The remote host
    PORT = 9999  # The same port as used by the server
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((HOST, PORT))
    while True:
        msg = bytes(input(">>:"), encoding="utf8")
        s.sendall(msg)
        data = s.recv(1024)    print('Received', data)
    s.close()

    事件驱动模型
    事件驱动模型大体思路如下:
    1. 有一个事件(消息)队列;
    2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
    3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
    4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;

  • 相关阅读:
    JS动态插入HTML后不能执行后续JQUERY操作
    小程序踩坑+进深
    小程序常用API介绍
    关于小程序你需要知道的事
    小程序app is not defined
    Apache 配置:是否显示文件列表
    小程序分享如何自定义封面?
    微信小程序 的文字复制功能如何实现?
    php-fpm参数优化【转】
    nginx 报错 upstream timed out (110: Connection timed out)解决方案【转】
  • 原文地址:https://www.cnblogs.com/czz0508/p/11066593.html
Copyright © 2011-2022 走看看