zoukankan      html  css  js  c++  java
  • python之线程、进程

    线程语法

    class Thread(_Verbose):
        """A class that represents a thread of control.
    
        This class can be safely subclassed in a limited fashion.
    
        """
        __initialized = False
        # Need to store a reference to sys.exc_info for printing
        # out exceptions when a thread tries to use a global var. during interp.
        # shutdown and thus raises an exception about trying to perform some
        # operation on/with a NoneType
        __exc_info = _sys.exc_info
        # Keep sys.exc_clear too to clear the exception just before
        # allowing .join() to return.
        __exc_clear = _sys.exc_clear
    
        def __init__(self, group=None, target=None, name=None,
                     args=(), kwargs=None, verbose=None):
            """This constructor should always be called with keyword arguments. Arguments are:
    
            *group* should be None; reserved for future extension when a ThreadGroup
            class is implemented.
    
            *target* is the callable object to be invoked by the run()
            method. Defaults to None, meaning nothing is called.
    
            *name* is the thread name. By default, a unique name is constructed of
            the form "Thread-N" where N is a small decimal number.
    
            *args* is the argument tuple for the target invocation. Defaults to ().
    
            *kwargs* is a dictionary of keyword arguments for the target
            invocation. Defaults to {}.
    
            If a subclass overrides the constructor, it must make sure to invoke
            the base class constructor (Thread.__init__()) before doing anything
            else to the thread.
    
    """
            assert group is None, "group argument must be None for now"
            _Verbose.__init__(self, verbose)
            if kwargs is None:
                kwargs = {}
            self.__target = target
            self.__name = str(name or _newname())
            self.__args = args
            self.__kwargs = kwargs
            self.__daemonic = self._set_daemon()
            self.__ident = None
            self.__started = Event()
            self.__stopped = False
            self.__block = Condition(Lock())
            self.__initialized = True
            # sys.stderr is not stored in the class like
            # sys.exc_info since it can be changed between instances
            self.__stderr = _sys.stderr
    ……

    线程

    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

    1、直接调用

    2、继承调用:先继承原类,再重写其中的run方法

    3、线程并行执行:join阻塞,直到此进程执行完毕再往下继续执行

    4、守护线程

    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    __Author__ = 'KongZhaGen'
    import threading
    import time
    
    # 定义用于线程执行的函数
    def sayhi(num):
        print "runing on number :%s"%num
        time.sleep(3)
        print 'done',num
    
    # main函数同时执行5个线程
    def main():
        for i in range(5):
            t = threading.Thread(target=sayhi,args=(i,))
            t.start()
    
    # 定义一个线程m,执行main函数
    m = threading.Thread(target=main,args=())
    # m线程定义为守护线程
    m.setDaemon(True)
    m.start()
    # 守护线程执行结束,其中的所有子线程全部被迫结束运行,接着继续执行其后的代码
    m.join(timeout=2)
    print "---------main thread done-----"
    

    5、线程锁

      无线程锁的情况

    一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    
    import time
    import threading
    
    def addNum():
        global num
        print '---getNum--',num
        time.sleep(1)
        num -= 1
    
    num = 100
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=addNum)  # 同时开启100个线程
        t.start()
        thread_list.append(t)
    
    for t in thread_list:
        t.join()
    
    print "final num",num
    

      其结果是:一会为0,一会为其它值

      加线程锁后...

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    
    import time
    import threading
    
    def addNum():
        global num
        print '---getNum--',num
        time.sleep(1)
        lock.acquire()  # 数据修改前加锁
        num -= 1
        lock.release()  # 数据修改后释放锁
    
    num = 100
    thread_list = []
    lock = threading.Lock()  # 生成全局锁
    
    for i in range(100):
        t = threading.Thread(target=addNum)  # 同时开启100个线程
        t.start()
        thread_list.append(t)
    
    for t in thread_list:
        t.join()
    
    print "final num",num
    

      其结果一直为0,说明锁起了作用。

    Semaphore(信号量)

    互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 

    定时器

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    import threading
    
    def hello():
        print "Hello Word.."
    
    t = threading.Timer(20,hello)  # 20秒钟之后执行hello函数
    t.start()
    

      

    队列Queue

    Queue.qsize() 返回队列的大小 
    Queue.empty() 如果队列为空,返回True,反之False 
    Queue.full() 如果队列满了,返回True,反之False
    Queue.full 与 maxsize 大小对应 
    Queue.get([block[, timeout]])获取队列,timeout等待时间 
    Queue.get_nowait() 相当Queue.get(False)
    非阻塞 Queue.put(item) 写入队列,timeout等待时间 
    Queue.put_nowait(item) 相当Queue.put(item, False)
    Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
    Queue.join() block直到queue被消费完毕

    先进先出 :Queue.Queue 

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    import Queue
    
    q = Queue.Queue(maxsize=3)
    q.put(1)
    q.put(3)
    q.put(4)
    # q.put(7)  # 超出三个会阻塞
    
    print q.get()  # 1
    print q.get()  # 3
    print q.get()  # 4
    
    # 先进先出
    q.put(7)
    print q.get()
    

      

    后进先出:Queue.LifoQueue

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    import Queue
    
    q = Queue.LifoQueue(maxsize=3)
    q.put(1)
    q.put(7)
    q.put(4)
    
    print q.get()  # 4
    print q.get()  # 7
    print q.get()  # 1
    # 结果:后进先出
    

      

    设置队列优先级:Queue.PriorityQueue

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    import Queue
    
    q = Queue.PriorityQueue(maxsize=4)
    print q.empty()  # 空队列
    print q.qsize()  # 队列个数0
    q.put((1,5))
    q.put((5,1))
    print q.qsize()  # 队列个数2
    print q.full()  # 队列未满
    q.put((3,6))
    print q.full()  # 队列未满
    q.put((2,6))
    print q.full()  # 队列已满
    # q.put_nowait(10)  # 队列满后再填入不阻塞,直接报错
    
    print q.get()  # 1
    print q.get()  # 2
    print q.get()  # 3
    print q.get()  # 5
    # 结果:优先小的先出
    print q.get_nowait()   # 队列为空后再取出不阻塞,直接报错
    

      

    生产者消费者模型

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

      最基本的例子(一个生产者,一个消费者)

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    import Queue
    import threading
    import time
    
    # 生产者
    def producer():
        for i in range(5):
            q.put(i)  # 往队列放15个面包
        print "等待取走面包....
    "
        q.join()  # 阻塞直到队列消费完毕
        print "面包被全部取走...
    "
    
    # 消费者
    def consumer(n):
        # 队列不为空则一直消费
        while q.qsize() > 0:
            print "%s 取走面包 %s"%(n, q.get())
            q.task_done()  # 告诉queue完成了一次消费,完成消费次数等于qsize,队列消费完毕,join取消阻塞
            time.sleep(1)
    
    q = Queue.Queue()
    
    p = threading.Thread(target=producer,)
    p.start()
    
    c = threading.Thread(target=consumer, args=(['小鬼']))
    c.start()
    

      结果:

    等待取走面包....

    小鬼 取走面包 0
    小鬼 取走面包 1
    小鬼 取走面包 2
    小鬼 取走面包 3
    小鬼 取走面包 4
    面包被全部取走...

      多个生产者与消费者

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    import Queue
    import threading
    import time,random
    
    q = Queue.Queue()
    # 生产者
    def producer(n):
        count = 0
        # 少于5个面包就生产
        while count < 5:
            time.sleep(random.randrange(3))
            q.put(count)
            print "%s product 面包"%n
            count += 1
        print "%s 已生产面包%s" %(n, q.qsize())
    
    def consumer(n):
        while True:
            time.sleep(random.randrange(4))
            # 有面包就消费
            if not q.empty():
                print "33[31;1m%s 吃了 面包 %s33[0m"%(n, q.get())
            else:
                # 没面包就退出
                print "没有面包了..."
                break
    
    p1 = threading.Thread(target=producer, args=(['厨师1']))
    p2 = threading.Thread(target=producer, args=(['厨师2']))
    p1.start()
    p2.start()
    
    c1 = threading.Thread(target=consumer, args=(['客户1']))
    c2 = threading.Thread(target=consumer, args=(['客户2']))
    c1.start()
    c2.start()
    

      

     进程间通迅

    queue

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    
    from multiprocessing import Process,Queue
    import time
    q = Queue()
    # 往队列传入三个值
    def pro_put(q):
        q.put(1)
        q.put(2)
        q.put(3)
    
    if __name__ == '__main__':
        # 开始一个新进程
        p = Process(target=pro_put, args=(q,))
        p.start()
        # 从队列获取原进程的数据
        print q.get()
        print q.get()
        print q.get()
    

      

    pipe

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    
    from multiprocessing import Process,Pipe
    
    def f(conn):
        # 往管道发送
        conn.send(1)
        conn.send(2)
        conn.send(3)
        conn.close()
    
    if __name__ == '__main__':
        # Pipe 分两端,任何一端send, 任何一端recv
        con_left, con_right = Pipe()
        p = Process(target=f, args=(con_right,))
        p.start()
        # 从管道接收原进程生成的数据
        print con_left.recv()
        print con_left.recv()
        print con_left.recv()
        p.join()
    

      

    进程同步

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    
    from multiprocessing import Process, Lock
    import time
    
    def f(l, i):
        # 当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突
        l.acquire() # Lock实现进程同步进行
        try:
            print "hello, ",i
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for num in range(10):
            Process(target=f, args=(lock, num)).start()
    

      

    进程池

    非阻塞进程池

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    from multiprocessing import Pool
    import time,os
    print 'parent:',os.getpid()
    
    def f(msg):
        print 'child:',os.getpid()
        print "start:",msg
        time.sleep(2)
        print "end  :",msg
        print "============"
    
    if __name__ == '__main__':
        # 进程池最大数量2
        pool = Pool(2)
        for i in range(5):
            msg = "hello %s" % i
            # 同时启动5个进程
            pool.apply_async(func=f, args=(msg,))
    
        print "---------------------"
        pool.close()
        pool.join()
    

      

      结果:

    阻塞进程池

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __Author__ = 'kongZhaGen'
    from multiprocessing import Pool
    import time
    
    def f(msg):
        print "start:",msg
        time.sleep(2)
        print "end  :",msg
        print "============"
    
    if __name__ == '__main__':
        # 进程池最大数量2
        pool = Pool(2)
        for i in range(5):
            msg = "hello %s" % i
            # 同时启动5个进程
            pool.apply(func=f, args=(msg,))
    
        print "---------------------"
        pool.close()
        pool.join()
    

      结果:

     

  • 相关阅读:
    jstack使用教程
    频繁fullgc排查
    ubuntu添加ubuntu make
    Spring属性编辑器详解
    mysql 查看触发器,删除触发器
    mongodb启动不了:提示错误信息为 child process failed, exited with error number 100
    RedHat7 防火墙设置以及端口设置
    linux 设置静态IP方法
    linux 安装mongo
    mongo 介绍
  • 原文地址:https://www.cnblogs.com/kongzhagen/p/6707021.html
Copyright © 2011-2022 走看看