zoukankan      html  css  js  c++  java
  • 进程,线程

    线程和进程的区别:

    1. Threads share the address space of the process that created it; processes have their own address space.
    2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
    3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
    4. New threads are easily created; new processes require duplication of the parent process.
    5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
    6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

         线程共享创建它的进程的地址空间;进程有自己的地址空间。

         线程直接访问进程的数据段;进程拥有父进程的数据段的自身副本。

              线程可以直接通信和该进程内的其他线程。 进程之间的通信必须通过 专有的进程间通信方式。

              新线程很容易被创建;新进程需要复制父进程。

              线程可以对相同进程的其他的线程进行直接的控制;进程只能对子进程进行控制。

              对主线程的更改(取消、优先级更改等)可能影响进程的其他线程的行为;对父进程的更改不会影响子进程。

    Python threading模块

    线程之间不关联,子线程启动后,默认主线程接着往下走不会等待子线程:

    import threading,time
    
    def run(n):
        print('task',n)
        time.sleep(3)
        print("task done")
    
    
    
    
    start_time = time.time()
    for i in range(10):
        t1=threading.Thread(target=run,args=(i,))
        t1.start()
    
    print("finished")                                #主线程不会等待子线程全部执行完在打印
    print("cost is ",time.time()-start_time)

    执行结果:
    task 0
    task 1
    task 2
    task 3
    task 4
    task 5
    task 6
    task 7
    task 8
    task 9
    finished
    cost is  0.002000093460083008
    task done
    task done
    task done
    task done
    task done
    task done
    task done
    task done
    task done
    task done

    如何实现子线程执行完在接着往下走呢? 加入join后,子线程一个一个的执行,实现串行的效果 。join相当于是wait

    import threading,time
    
    def run(n):
        print('task',n)
        time.sleep(3)
        print("task done")
    
    
    
    
    start_time = time.time()
    for i in range(10):
        t1=threading.Thread(target=run,args=(i,))
        t1.start()
        t1.join()  #wait
    
    print("finished")
    print("cost is ",time.time()-start_time)


    执行结果:
    task 0
    task done
    task 1
    task done
    task 2
    task done
    task 3
    task done
    task 4
    task done
    task 5
    task done
    task 6
    task done
    task 7
    task done
    task 8
    task done
    task 9
    task done
    finished
    cost is  30.00371479988098


    如何实现并行执行10个线程,最后统计执行的时间

    import threading,time
    
    def run(n):
        print('task',n)
        time.sleep(3)
        print("task done")
    
    
    
    
    start_time = time.time()
    task_list = []
    for i in range(10):
        t1=threading.Thread(target=run,args=(i,))
        t1.start()
        task_list.append(t1)                       #实现了线程启动不阻塞
    
    
    for j in task_list :
        j.join()                                   #等待所有线程执行后,主线程才接着往下走
    print("finished")
    print("cost is ",time.time()-start_time)
    
    执行结果:
    task 0
    task 1
    task 2
    task 3
    task 4
    task 5
    task 6
    task 7
    task 8
    task 9
    task done
    task done
    task done
    task done
    task done
    task done
    task done
    task done
    task done
    task done
    finished
    cost is  3.012172222137451

    设置守护线程:程序在非守护线程执行完毕就退出

    import threading,time
    
    def run(n):
        print('task',n)
        time.sleep(3)
        print("task done")
    
    
    
    
    start_time = time.time()
    task_list = []
    for i in range(10):
        t1=threading.Thread(target=run,args=(i,))
        t1.setDaemon(True)               #每个子线程都设置为守护线程,主线程退出程序即退出,子线程跟着中断,不等子线程执行完
        t1.start()
        task_list.append(t1)
    
    
    #for j in task_list :
    #    j.join()
    print("finished")
    print("cost is ",time.time()-start_time)
    执行结果:
    
    task 0
    task 1
    task 2
    task 3
    task 4
    task 5
    task 6
    task 7
    task 8
    task 9
    finished
    cost is  0.0030002593994140625

    Python GIL(Global Interpreter Lock) 

    CPython是大部分环境下默认的Python执行环境。在CPython解释器下,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。

    子线程之间的通信:

    import threading,time
    
    num = 0
    def run(n):
        global num
        print 'task',n
        num +=1
        time.sleep(0.3)
        print "task done"
    
    
    
    
    start_time = time.time()
    task_list = []
    for i in range(100000):
        t1=threading.Thread(target=run,args=(i,))
        t1.start()
        task_list.append(t1)
    
    
    for j in task_list :
        j.join()
    print  num
    
    在python2.6 执行:
    
    99982  # why 

    在python3.x 执行时不会出错
    *注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

    结果为99982  而不是 100000.

    如何保证计算不出错? 在修改数据时加锁,在修改数据时让程序串行 。同时让其他操作并行

    GIL VS Lock

    Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系

    加锁版本:

    import threading,time
    
    num = 0
    def run(n):
        lock.acquire()           #计算前加锁
        global num
        print('task',n)
    
        num +=1
        lock.release()          #计算后释放,避免sleep也等待
    
        time.sleep(1)
        print("task done")
    
    
    lock = threading.Lock()
    
    start_time = time.time()
    task_list = []
    for i in range(1000):
        t1=threading.Thread(target=run,args=(i,))
        t1.start()
        task_list.append(t1)
    
    
    for j in task_list :
        j.join()
    print(num)

    RLock(递归锁)

    说白了就是在一个大锁中还要再包含子锁

    import threading,time
     
    def run1():
        print("grab the first part data")
        lock.acquire()
        global num
        num +=1
        lock.release()
        return num
    def run2():
        print("grab the second part data")
        lock.acquire()
        global  num2
        num2+=1
        lock.release()
        return num2
    def run3():
        lock.acquire()
        res = run1()
        print('--------between run1 and run2-----')
        res2 = run2()
        lock.release()
        print(res,res2)
     
     
    if __name__ == '__main__':
     
        num,num2 = 0,0
        lock = threading.RLock()
        for i in range(10):
            t = threading.Thread(target=run3)
            t.start()
     
    while threading.active_count() != 1:    #加上主线程一共11个
        print(threading.active_count())
    else:
        print('----all threads done---')
        print(num,num2)

    Semaphore(信号量)

    互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

    import threading,time
    
    def run(n):
        semaphore.acquire()
        print('task',n)
        time.sleep(1)
        semaphore.release()
    
    
        print("task done")
    
    
    semaphore = threading.BoundedSemaphore(5)
    
    start_time = time.time()
    task_list = []
    for i in range(23):
        t1=threading.Thread(target=run,args=(i,))
        t1.start()
        task_list.append(t1)
    
    
    for j in task_list :
        j.join()

    #设置同时并行执行的数量,避免系统崩溃
    #同时只能有5个线程并行执行,空出来一个线程在进去一个
    执行结果:
    task 0
    task 1
    task 2
    task 3
    task 4
    task done
    task done
    task 6
    task done
    task done
    task 8
    task 7
    task 5
    task done
    task 9
    task done
    task done
    task 10
    task done
    task done
    task 11

    Events

    通过Event来实现两个或多个线程间的交互

    can wait for the flag to be set, or set or clear the flag themselves.

    event = threading.Event()

    # a client thread can wait for the flag to be set
    event.wait()   #暂停

    # a server thread can set or reset it
    event.set()
    event.clear()

    def lighter():
        num = 0
        while True:
    
            if num >20 and num <30:
                event.set()
            elif num > 30:
    
                event.clear()
                num = 0
            else:
                pass
    
    
            num += 1
    
    def car():
        if event.is_set():
            print('telsla is allowed  going.')
        else:
            print('telsla is not going.')
    
    event = threading.Event()
    
    light = threading.Thread(target=lighter,)
    light.start()
    
    for i in range(100):
        my_car = threading.Thread(target=car)
        my_car.start()

     queue队列:

    队列和列表的区别: 队列的数据取出来就没了。 列表的数据一直存在列表里 取出来数据并不会丢失。

    #python2 环境: 

    >>> import Queue >>> q = Queue.Queue()>>> q.qsize() 0 >>> q.put('d1') >>> q.put('d2') >>> q.put('d3') >>> q.qsize() 3 >>> q.get() 'd1' >>> q.get() 'd2' >>> q.get() 'd3' >>>
    >>> q.get_nowait()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/lib64/python2.6/Queue.py", line 190, in get_nowait
        return self.get(False)
      File "/usr/lib64/python2.6/Queue.py", line 165, in get
        raise Empty
    Queue.Empty
    >>> q.qsize()
    0
    >>>
    >>> q.get(block=False)
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/lib64/python2.6/Queue.py", line 165, in get
        raise Empty
    Queue.Empty
    >>> q.get(timeout=1)
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/lib64/python2.6/Queue.py", line 176, in get
        raise Empty
    Queue.Empty

    常用的操作:

    >>> q = Queue.Queue(maxsize=3)   #设置队列长度  先入先出
    >>> q = Queue.LifoQueue() #后进先出

    >>> q = Queue.PriorityQueue() #存储数据时可设置优先级的队列
    >>> q.put((6,'zhang'))
    >>> q.put((-1,'li'))
    >>> q.put((10,'shi'))
    >>> q.get()
    (-1, 'li')
    >>> q.get()
    (6, 'zhang')
    >>> q.get()
    (10, 'shi')

    Queue.qsize()
    
    
    Queue.empty() #return True if empty  
    
    
    Queue.full() # return True if full 
    
    
    Queue.put(itemblock=Truetimeout=None)
    Queue.get(block=Truetimeout=None)
    Queue.task_done()

    Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

    If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

    Raises a ValueError if called more times than there were items placed in the queue.

    Queue.join() block直到queue被消费完毕



    生产者消费者模型

    什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    import queue
    
    q = queue.Queue()
    
    
    def Producer(name):
    
        for i in range(10):
            q.put('肉%s' % i )
            print("肉 %s 出来了" % i )
        q.join()
        print("所有的骨头被取完了...")
    
    
    def Consumer(name):
            while q.qsize()>0:
                food =q.get()
                print("%s 吃肉 %s " % (name,food))
                time.sleep(0.5)
                q.task_done()
            print("end")
    
    
    p = threading.Thread(target=Producer,args=('alex',))
    c = threading.Thread(target=Consumer,args=('zhang',))
    p.start()
    c.start()
    #不停的产生,不停的消费的例子

    import queue

    q = queue.Queue(maxsize=10)


    def Producer(name):
    count = 1
    while True:
    q.put("骨头%s" % count)
    print("生产了骨头", count)
    count += 1
    time.sleep(5)


    def Consumer(name):

    while True:
    print("[%s] 取到[%s] 并且吃了它..." % (name, q.get())) #得不到结果会等待 直到得到结果在继续执行



    p = threading.Thread(target=Producer, args=("Alex",))
    c = threading.Thread(target=Consumer, args=("ChengRonghua",))
    c1 = threading.Thread(target=Consumer, args=("zhang",))


    p.start()
    c.start()
    c1.start()
    
    

    Timer :

    通过该方法实现每隔一段时间后循环执行某任务

    from threading import Timer
    def foo():
        print 30
        Timer(1,foo).start()
    
    
    if __name__ == '__main__':
        Timer(1,foo).start()   #after 1 seconds, "30" will be printed
    
    #执行结果:
    30
    30
    30
    30
    30
    ....
    View Code

    python 多线程和多进程:

    1.  python多线程同一个时间只能有一个线程运行。所谓的多线程就是单线程之间的cpu上下文切换的效果

    2.  python多线程不适合cpu密集操作型的任务,适合io操作密集型的任务

    io操作不占用cpu

    计算占用cpu

    3. python多进程可以使用cpu多核

    多进程multiprocessing

    import multiprocessing
    import time
    import os
    
    def info(message):
        print message
        print "parent pid.",os.getppid()
        print "pid.",os.getpid()
    
    
    def run(name):
        info("child process...")
        print "hello",name
    
    
    if __name__ == "__main__":
        info("current process...")
        p = multiprocessing.Process(target=run,args=('zhang',))
        p.start()


    #执行结果:
    current process...
    parent pid. 1674004
    pid. 1737051
    child process...
    parent pid. 1737051
    pid. 1737052
    hello zhang


    进程间通讯  

    不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

    进程 Queues (两个进程间通信,Queue里面通过中间方pickle序列化与反序列化克隆数据。 不同于线程queue.Queue,线程间可以直接访问该变量。)

    from  multiprocessing import Process,Queue
    import time
    
    
    def run(qq):
        qq.put('hello')
    
    if __name__ =='__main__':
        q = Queue()
    
        p = Process(target=run,args=(q,))
    
        p.start()
        print(q.get())

    Pipes 管道

    from multiprocessing import Process, Pipe
     
    def f(conn):
        conn.send([42, None, 'hello'])
        print(conn.recv()) 
        conn.close()
     
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        parent_conn.send('ok?')
        p.join()

    进程 Queues 和 Pipe 实现了进程间数据的通讯,如何实现进程间数据共享呢? 通过Managers

    Managers

    from multiprocessing import Process, Manager
    import os
    
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.append(os.getpid())
        print(l)
    
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()       #生成一个字典,在多个进程中共享
     
            l = manager.list(range(5))   #生成一个列表,在多个进程中共享
            p_list = []
            for i in range(10):
                p = Process(target=f, args=(d, l))
                p.start()
                p_list.append(p)
            for res in p_list:
                res.join()
    
            print(d)
            print(l)

    进程池

    概念:同一个时间有多少个进程在运行  

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply                             串行
    • apply_async                  并行
    from multiprocessing import Process,Pool
    import os
    
    
    def f(i):
        print("in process %s ..." % i,os.getpid())
        time.sleep(1)
        return i + 100
    
    
    if __name__ == '__main__':
        pool = Pool(5)
    
    
        for i in range(10):
            pool.apply(func=f,args=(i,))    #串行
    
        print('end')
        pool.close()
        pool.join()

    #运行结果:

    in process 0 ... 9500
    in process 1 ... 4876
    in process 2 ... 10416
    in process 3 ... 11204
    in process 4 ... 9792
    in process 5 ... 9500
    in process 6 ... 4876
    in process 7 ... 10416
    in process 8 ... 11204
    in process 9 ... 9792
    end
    from multiprocessing import Process,Pool
    import os
    
    
    def f(i):
        print("in process %s ..." % i,os.getpid())
        time.sleep(1)
        return i + 100
    
    
    if __name__ == '__main__':
        pool = Pool(5)
    
    
        for i in range(10):                      #同时启动了10个,有5个挂起,只有5个在运行
            pool.apply_async(func=f,args=(i,))   #并行
    
        print('end')
        pool.close()
        pool.join()
    
    #运行结果:
    end
    in process 0 ... 9148
    in process 1 ... 6692
    in process 2 ... 7128
    in process 3 ... 9476
    in process 4 ... 10056
    in process 5 ... 9148
    in process 6 ... 6692
    in process 7 ... 7128
    in process 8 ... 9476
    in process 9 ... 10056

    callback 回调,当子进程结束后,父进程执行这个回调的内容

    from multiprocessing import Process,Pool
    import os
    
    
    def f(i):
        print("in process %s ..." % i,os.getpid())
        time.sleep(1)
        return i + 100
    
    def bar(arg):
        print("-->exec is done...",arg,os.getpid())
    
    if __name__ == '__main__':
        pool = Pool(5)
    
        print(os.getpid())
        for i in range(10):
            pool.apply_async(func=f,args=(i,),callback=bar)
    
        print('end')
        pool.close()
        pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
    
    #运行结果:
    4288
    end
    in process 0 ... 8588
    in process 1 ... 8404
    in process 2 ... 10680
    -->exec is done... 100 4288
    in process 3 ... 8588
    in process 4 ... 9884
    in process 5 ... 9192
    -->exec is done... 101 4288
    in process 6 ... 8404
    -->exec is done... 102 4288
    in process 7 ... 10680
    -->exec is done... 103 4288
    in process 8 ... 8588
    in process 9 ... 9884
    -->exec is done... 104 4288
    -->exec is done... 105 4288
    -->exec is done... 106 4288
    -->exec is done... 107 4288
    -->exec is done... 108 4288
    -->exec is done... 109 4288
  • 相关阅读:
    JMETER-02-常用方法-全局变量,逻辑控制器,随机控制器,吞吐量控制器,加断言,事物控制器 ,循环控制器,仅一次控制器,foreach控制器
    接口自动化01接口基础-之接口的调用之postman和jmeter
    接口自动化01接口基础
    php中的9大缓存技术总结
    tp5自动生成目录
    PHP 服务器变量 $_SERVER
    从正则表达式的iUs说说模式修正符
    简单介绍下MYSQL的索引类型
    mysql几种存储引擎介绍
    PHP中return 和 exit 、break和contiue 区别与用法
  • 原文地址:https://www.cnblogs.com/mao3714/p/8178283.html
Copyright © 2011-2022 走看看