zoukankan      html  css  js  c++  java
  • python day11


    一、线程
            基本使用
            线程锁
            自定义线程池
           
            生产者消费者模型(队列)
    二、进程
            基本使用
            进程锁
            进程数据共享
                默认数据不共享
                queues
                array
                Manager.dict
            进程池
       
        PS:
            IO密集型-多线程
            计算密集型 - 多进程
       
    三、协程
            原理:利用一个线程,分解一个线程成为多个“微线程”==》程序级别
            greenlet
           
            gevent
           
            pip3 install gevent
    四、缓存
            1、安装软件
            2、程序:安装其对应的模块
            Socket连接,
            memecach
                1、天生集群
                2、基本
                3、gets,cas
               
                k -> ""
            redis
                k -> ""
                k -> [11,11,22,33,44]
                k -> {"k1":xxx}
                k -> [11,22]
                k -> [(11,1),(13,2),]


    #使用多线程
    import threading
    def f1(arg):
        print(arg)
    t = threading.Thread(target=f1,args=(123,))
    t.start()


    import threading
    from time import ctime,sleep
    import time
    def music(func):
        for i in range(2):
            print("我在听--------- %s. %s" %(func,ctime()))
            time.sleep(1)

    def move(func):
        for i in range(2):
            print("我在看---------- %s! %s" %(func,ctime()))
            time.sleep(5)

    threads = []
    t1 = threading.Thread(target=music,args=(u'爱情买卖',))  #把music加入到多线程
    t2 = threading.Thread(target=move,args=(u'阿凡达',))     #把move加入到多线程
    threads.append(t1)
    threads.append(t2)

    for i in threads:
        i.start()


    import threading
    from time import ctime,sleep
    import time
    def music(func):
        for i in range(2):
            print("我在听--------- %s. %s" %(func,ctime()))
            #time.sleep(1)

    def move(func):
        for i in range(2):
            print("我在看---------- %s! %s" %(func,ctime()))
            #time.sleep(5)

    t1 = threading.Thread(target=music,args=(u'爱情买卖',))  #把music加入到多线程
    t2 = threading.Thread(target=move,args=(u'阿凡达',))     #把move加入到多线程
    t1.start()
    sleep(2)
    t2.start()

    同时做两件事
    #听歌看片一起干,想要干两次,此程序只干一次,因为setDaemon不等子线程
    import threading
    from time import ctime,sleep
    def music(func):
        for i in range(2):
            print("我在听-------- %s. %s" %(func,ctime()))
            sleep(1)

    def move(func):
        for i in range(2):
            print("我在看-------- %s! %s" %(func,ctime()))
            sleep(5)

    threads = []
    t1 = threading.Thread(target=music,args=(u'爱情买卖',))  #把music加入到多线程
    threads.append(t1)
    t2 = threading.Thread(target=move,args=(u'阿凡达',))     #把move加入到多线程
    threads.append(t2)

    if __name__ == '__main__':
        for t in threads:
            t.setDaemon(True)       #子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句print "all over %s" %ctime()后,没有等待子线程,直接就退出了,同时子线程也一同结束。
            t.start()               #开始启动线程
        print("两件事同时做完----------- %s" %ctime())

    多线程完整程序,同时听了歌又看了片
    import threading
    from time import ctime,sleep
    def music(func):
        for i in range(2):
            print("我在听---------- to %s. %s" %(func,ctime()))
            sleep(1)
    def move(func):
        for i in range(2):
            print("我在看---------- to %s! %s" %(func,ctime()))
            sleep(5)
    threads = []
    t1 = threading.Thread(target=music,args=(u'牛逼存在',))
    threads.append(t1)
    t2 = threading.Thread(target=move,args=(u'钟馗伏魔',))
    threads.append(t2)
    if __name__ == '__main__':
        for t in threads:
            t.setDaemon(True)
            t.start()
        t.join()        #等待进程终止,子程进程没有运行完,就不执行父进程
        print("所有结束--------------------- %s" %ctime())

    # 结果
    我在听---------- to 牛逼存在. Mon Jul 18 00:18:22 2016
    我在看---------- to 钟馗伏魔! Mon Jul 18 00:18:22 2016
    我在听---------- to 牛逼存在. Mon Jul 18 00:18:23 2016
    我在看---------- to 钟馗伏魔! Mon Jul 18 00:18:27 2016
    所有结束--------------------- Mon Jul 18 00:18:32 2016

    #自定义多线程,调用
    import threading
    class MyThread(threading.Thread):
        def __init__(self,func,args):
            self.func = func
            self.args = args
            super(MyThread,self).__init__()          #super继承调用父类,解决多继承的重复调用问题
        def run(self):                            # 自动执行run方法
            self.func(self.args)
    def f2(arg):
        print(arg)
    obj = MyThread(f2,123)
    obj.start()

    #结果
    123

    消息队列

    # queue.Queue(2) 先进先出队列
    # put放数据,是否阻塞,阻塞时的超时事件
    # get取数据(默认阻塞),是否阻塞,阻塞时的超时事件
    # qsize()真实个数
    # maxsize 最大支持的个数
    # join,task_done,阻塞进程,当队列中任务执行完毕之后,不再阻塞
     
    q = queue.Queue(2)                       #只接收两个队列
    print(q.empty())
    q.put(11)
    q.put(22)
    print(q.empty())
    print(q.qsize())
    q.put(22)
    q.put(33, block=False)
    q.put(33,block=False, timeout=2)
    print(q.get())
    print(q.get())
    print(q.get(timeout=2))

    ----结果
    True
    False
    2                    # 阻塞了,只接收了两个


    import queue
    # queue.Queue,先进先出队列
    # queue.LifoQueue,后进先出队列
    # queue.PriorityQueue,优先级队列
    # queue.deque,双向对队

    #后进先出队列
    q = queue.LifoQueue()
    q.put(123)                #放数据给队列
    q.put(456)
    print(q.get())            #取一条数据
    print(q.get())            #取第二条数据


    # #优先级队列
        #数值小的高
    q = queue.PriorityQueue()
    q.put((1,"alex1"))      #存数据给队列,优化级为1
    q.put((1,"alex2"))
    q.put((1,"alex3"))
    q.put((3,"alex3"))
    print(q.get())           #取数据

    #双向对队
    q = queue.deque()
    q.append(123)
    q.append(333)
    q.appendleft(456)
    q.pop()
    q.popleft()

    队列机制

    import queue

    queue.Queue #先进先出队列
    queue.LifoQueue  #后队进先出队列
    queue.PriorityQueue #优先级队列
    queue.deque #双向对队

    q = queue.LifoQueue()
    q.put(123)
    q.put(456)
    print(q.get())

    q = queue.PriorityQueue()
    q.put((1,"alex1"))
    q.put((1,"alex2"))
    q.put((1,"alex3"))
    q.put((3,"alex3"))
    print(q.get())

    q = queue.deque()
    q.append(123)
    q.append(333)
    q.appendleft(456)
    q.pop()
    q.popleft()


    生产者消费者
    消息队列解决供给问题,解决阻塞,解耦

    import queue
    import threading
    import time

    q = queue.Queue()                      #先进先出队列

    def productor(arg):                      # 生产者
        """
        买票
        :param arg:
        :return:
        """
        q.put(str(arg) + '-包子')         #发包子器

    for i in range(300):                  #300个人买包子
        t = threading.Thread(target=productor,args=(i,))
        t.start()

    def consumer(arg):                     # 消费者
        """
        服务器后台
        :param arg:
        :return:
        """
        while True:                         #无限生产
            print(arg,q.get())          
            time.sleep(2)
    for j in range(3):                     #3个人一直生产       
        t = threading.Thread(target=consumer,args=(j,))
        t.start()


       
       
    # 线程锁

    #  threading.RLock和threading.Lock


    # threading.Lock   产生死锁阻塞
    import threading 
    lock = threading.Lock() #Lock对象 
    lock.acquire() 
    lock.acquire()     #产生了死锁。 
    lock.release() 
    lock.release() 

    -----结果
                #一直循环不退出,因为锁住了
               
    # threading.RLock          #在同一线程内,程序不会堵塞。
    import threading
    rLock = threading.RLock()  #RLock对象
    rLock.acquire()
    rLock.acquire()            #在同一线程内,程序不会堵塞。
    rLock.release()
    rLock.release()

    -----结果
    Process finished with exit code 0      #不阻塞

    # 上锁和解锁,递归10减到0   
    import threading
    import time

    NUM = 10

    def func(l):
        global NUM
        # 上锁
        l.acquire()
        NUM -= 1
        time.sleep(2)
        print(NUM)
        # 开锁
        l.release()
    #lock = threading.Lock()
    lock = threading.RLock()      

    for i in range(10):
        t = threading.Thread(target=func,args=(lock,))
        t.start()


       
    -----结果

    全锁,全放

    #红灯停,绿灯放行
    import threading

    def func(i,e):
        print(i)
        e.wait() # 检测是什么灯,如果是红灯,停;绿灯,行
        print(i+100)

    event = threading.Event()

    for i in range(10):
        t = threading.Thread(target=func, args=(i,event,))
        t.start()
    #========
    event.clear() # 设置成红灯
    inp = input('>>>')
    if inp == "1":                   #输入1放行
        event.set() # 设置成绿灯  

       
       
       
    ------结果
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    >>>1
    100
    103
    104
    107
    108
    101
    102
    105
    106
    109


    信号量,放几个出去
    import threading
    import time
    NUM = 10

    def func(i,l):
        global NUM
        # 上锁
        l.acquire()
        NUM -= 1
        time.sleep(2)
        print(NUM,i)
        # 开锁
        l.release()
    #lock = threading.Lock()
    #lock = threading.RLock()
    lock = threading.BoundedSemaphore(5)   #信号量,放几个出去

    for i in range(10):
        t = threading.Thread(target=func,args=(i,lock,))
        t.start()


       
       
       
       
    import threading
    def func(i,con):
        print(i)
        con.acquire()
        con.wait()
        print(i+100)
        con.release()

    c = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=func, args=(i,c,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        c.acquire()
        c.notify(int(inp))
        c.release()
       

       
       
       
    import threading

    def condition():
        ret = False
        r = input('>>>')
        if r == 'true':
            ret = True
        else:
            ret = False
        return ret


    def func(i,con):
        print(i)
        con.acquire()
        con.wait_for(condition)
        print(i+100)
        con.release()

    c = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=func, args=(i,c,))
        t.start()

       
       
       
       
       
       
    # 1秒之后打印hello world
    from threading import Timer
    def hello():
        print("hello, world")

    t = Timer(1, hello)      #1秒之后打印hello world
    t.start()  # after 1 seconds, "hello, world" will be print

       
       
       
       
       
       
    #线程池程序

    import queue
    import threading
    import contextlib
    import time

    StopEvent = object()                                        #配置空值,判断空值终止线程

    class ThreadPool(object):

        def __init__(self, max_num, max_task_num = None):      #配置最大线程数和任务最大个数
            """
            初始化函数做三件事:
            1.判断任务最大数是否为真,真则创建队列存任务,假则创建不受限制队列
            2.创建当前已经创建的线程变量
            3.创建当前空闲多少线程变量
            """
            if max_task_num:
                self.q = queue.Queue(max_task_num)             #创建队列,用来装任务
            else:
                self.q = queue.Queue()
            self.max_num = max_num
            self.cancel = False
            self.terminal = False
            self.generate_list = []                            #当前已经创建的线程
            self.free_list = []                                   #当前空闲多少线程

        def run(self, func, args, callback=None):
            """
            线程池执行一个任务
            :param func: 任务函数
            :param args: 任务函数所需参数
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
            :return: 如果线程池已经终止,则返回True否则None
            """
            if self.cancel:
                return
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:      #判断没有空闲线程和已经创建的线程小于线程池最大数,则创建线程
                self.generate_thread()     # 创建线程
            w = (func, args, callback,)    # 以下两行把任务放入队列   #FUNC 是函数 ARGS、CALLBACK是元组
            self.q.put(w)

        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.call)             #创建线程执行call方法(用call方法创建线程),每创建一个线程都执行call
            t.start()

        def call(self):
            """
            循环去获取任务函数并执行任务函数
            """
            current_thread = threading.currentThread
            self.generate_list.append(current_thread)       #获取已创建的线程存入列表

            event = self.q.get()                            #取任务

            while event != StopEvent:
                '''如果获取的任务不为空,则执行改变其状态,循环配置每个任务为空闲,否则传入的是空值,则清除线程列表'''
                func, arguments, callback = event              #是元组event即任务便执行
                try:
                    result = func(*arguments)                 #传参,执行ACTION函数,ACTION执行完是空闲
                    success = True
                except Exception as e:
                    success = False
                    result = None

                if callback is not None:
                    try:
                        callback(success, result)
                    except Exception as e:
                        pass

                with self.worker_state(self.free_list, current_thread):    #如果执行ACTION,把任务设置成空闲
                    if self.terminal:
                        event = StopEvent
                    else:
                        event = self.q.get()
            else:

                self.generate_list.remove(current_thread)

        def close(self):
            """
            执行完所有的任务后,所有线程停止
            """
            self.cancel = True
            full_size = len(self.generate_list)
            while full_size:
                self.q.put(StopEvent)
                full_size -= 1

        def terminate(self):
            """
            无论是否还有任务,终止线程
            """
            self.terminal = True

            while self.generate_list:
                self.q.put(StopEvent)        # 终止前存入队列

            self.q.empty()


        '''以下用contextlib装饰器模块为记录正在等待的线程数,用于call方法把了队列全部读出来,即实现装饰器为空'''
        @contextlib.contextmanager                                #contoextlib上下文管理,实现类似with的自动关闭机制,其实就是个with封装
        def worker_state(self, state_list, worker_thread):
            """
            用于记录线程中正在等待的线程数
            """
            state_list.append(worker_thread) 
            try:
                yield
            finally:
                state_list.remove(worker_thread)


    pool = ThreadPool(5)                                #创建线程池

    def callback(status, result):
        # status, execute action status
        # result, execute action return value
        pass


    # 创建一个读任务函数
    def action(i):
        print(i)

    for i in range(300):                                  #300个任务
        ret = pool.run(action, (i,), callback)

    # time.sleep(5)
    # print(len(pool.generate_list), len(pool.free_list))
    # print(len(pool.generate_list), len(pool.free_list))

    进程数据共享


    from multiprocessing import Process
    from multiprocessing import queues
    import multiprocessing


    def foo(i,arg):                 #定义函数两个选项,用于后续传参
        arg.put(i)                  #定义写队列参数
        print('say hi',i,arg.qsize())      #打印i写入的队列,arg.qsize统计队列数

    if __name__=='__main__':
        li = queues.Queue(20,ctx=multiprocessing)    #允许20个队列,使用多进程处理队列
        for i in range(10):                          #循环PUT 10次
            p = Process(target=foo,args=(i,li,))     #多进程方式处理
            p.start()                                #启动多进程

           
           
           
           
    进程间数据共享       
    # Array来共享数据       
    from multiprocessing import Process
    from multiprocessing import queues
    import multiprocessing
    from multiprocessing import Array       

    def foo(i,arg):
        # arg.put(i)
        # print('say hi',i,arg.qsize())
        arg[i] = i + 100
        for item in arg:
            print(item)
        print('================')

    if __name__ == "__main__":
        # li = []
        # li = queues.Queue(20,ctx=multiprocessing)
        li = Array('i', 10)                             #将10个进程数据存入Array中
        for i in range(10):
            p = Process(target=foo,args=(i,li,))
            #p.daemon = True
            p.start()
            #p.join()

           
           
           

           

    from multiprocessing import Process
    from multiprocessing import queues
    import multiprocessing
    from multiprocessing import Manager


    def foo(i,arg):
        # arg.put(i)
        # print('say hi',i,arg.qsize())
        # arg[i] = i + 100
        # for item in arg:
        #     print(item)
        # print('====================')
        arg[i] = i + 100
        print(arg.values())

    if __name__=="__main__":
        obj = Manager()
        li = obj.dict()
        for i in range(10):
            p = Process(target=foo,args=(i,li,))
            p.start()
            #p.join()
        import time
        time.sleep(0.1)
       
       
       
       
       
       
       
       
       
       
       
    from multiprocessing import Process
    from multiprocessing import queues
    import multiprocessing
    from multiprocessing import Manager


    def foo(i,arg):
        # arg.put(i)
        # print('say hi',i,arg.qsize())
        # arg[i] = i + 100
        # for item in arg:
        #     print(item)
        # print('====================')
        arg[i] = i + 100
        print(arg.values())

    if __name__=="__main__":
        obj = Manager()
        li = obj.dict()
        for i in range(10):
            p = Process(target=foo,args=(i,li,))
            p.start()
            p.join()
        # import time
        # time.sleep(0.1)


       
    串行没有多线程
    from multiprocessing import Pool
    import time

    def f1(arg):
        time.sleep(1)
        print(arg)

    if __name__ == "__main__":
        pool =  Pool(5)
        for i in range(30):
            pool.apply(func=f1,args=(i,))
        print('end')

    五个一起执行
    from multiprocessing import Pool
    import time

    def f1(arg):
        time.sleep(1)
        print(arg)

    if __name__ == "__main__":
        pool =  Pool(5)
        for i in range(30):
            #pool.apply(func=f1,args=(i,))
            pool.apply_async(func=f1,args=(i,))      #去队列中取任务

        pool.close()
        pool.join()

       
       
       
       

    from multiprocessing import Pool
    import time

    def f1(arg):
        time.sleep(1)
        print(arg)

    if __name__ == "__main__":
        pool =  Pool(5)
        for i in range(30):
            #pool.apply(func=f1,args=(i,))
            pool.apply_async(func=f1,args=(i,))      #去队列中取任务

        time.sleep(2)
        # pool.close()                          #所有的任务执行完毕
        pool.terminate()                      #立即终止
        pool.join()

       
       


    协程
    安装gevent  (WINDOWS按以下方式安装代替pip3 install gevent)
    python3 -m pip install gevent
       
       
       
    http://www.cnblogs.com/wupeiqi/articles/5040827.html
    看11天图


    协程
    # 交叉使用,只使用一个线程,在一个线程中规定某个代码块执行顺序。
    # 协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
    from greenlet import greenlet


    def test1():
        print(12)
        gr2.switch()
        print(34)
        gr2.switch()


    def test2():
        print(56)
        gr1.switch()
        print(78)

    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()

    -------结果
    12
    56
    34
    78


    协程 :遇到IO操作自动切换
    from gevent import monkey; monkey.patch_all()
    import gevent
    import requests

    def f(url):
        print('GET: %s' % url)
        resp = requests.get(url)
        data = resp.text
        print('%d bytes received from %s.' % (len(data), url))

    gevent.joinall([
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://github.com/'),
    ])


    -------结果
    GET: https://www.python.org/
    GET: https://www.yahoo.com/
    GET: https://github.com/
    449397 bytes received from https://www.yahoo.com/.
    25533 bytes received from https://github.com/.
    47394 bytes received from https://www.python.org/.


    缓存


    # Memcached安装
    yum -y install libevent
    yum -y install memcached
    service memcached start  #可以不用这句,用下面的

    # Memcached使用
    import memcache
    mc = memcache.Client(['172.16.0.2:11211'], debug=True)
    mc.set("foo", "bar")
    ret = mc.get('foo')
    print(ret)

    # 启动Memcached
    memcached -d -m 10    -u root -l 127.0.0.1 -p 12000 -c 256 -P /tmp/memcached.pid

    http://www.cnblogs.com/wupeiqi/articles/5132791.html

  • 相关阅读:
    【交互稿】规范
    【管理】带人
    【产品文档】一份很不错的产品文档-神策
    没有Iphone也能装逼:让Android版QQ显示成Iphone6
    帝国备份王(Empirebak)万能cookie及拿shell
    mysql syntax bypass some WAF
    主流的国外机房地址
    最新php一句话木马
    手机自带的显示基站命令(android手机定位,iphone基站定位)
    User Agent跨站攻击
  • 原文地址:https://www.cnblogs.com/wangminghu/p/5695269.html
Copyright © 2011-2022 走看看