zoukankan      html  css  js  c++  java
  • python之day11(线程,进程,协程,memcache,redis)

    一 线程
        基本使用

      

    import threading
    
    def f1(arg):
        print(arg)
    
    t = threading.Thread(target=f1,args=("hello",))  #target 调用的 函数名 ,args 写入的参数
    t.start()                                                            #开始等待线程调用
    

     自定义线程类:

    import threading
    
    class MyThread(threading.Thread):      #建立类,继承threading.Thread
        def __init__(self, func, args):
            self.func = func
            self.args = args
            super(MyThread, self).__init__()    #使用父类的__init__函数
        def run(self):
            self.func(self.args)
    
    def f2(arg):
        print(arg)
    
    obj = MyThread(f2,123)     #类似于t = threading.Thread(target=f2,args=(123,))
    obj.start()    
    

     
        线程锁

        基本线程锁使用:

    import threading
    import time
    
    NUM = 10
    
    def func(l):
        global NUM
        #上锁
        l.acquire()
        NUM -= 1
        time.sleep(2)
        print(NUM)
        #开锁
        l.release()
    
    lock = threading.RLock()              创建对象 锁
    
    
    for i in range(10):
        t = threading.Thread(target=func, args = (lock, ))
        t.start()
    
    
    #一个数字一个数字的 顺序打印 中间间隔1秒
    

        自控线程锁:

    import threading
    
    def condition():
        ret = False
        r = input('>>>')
        if r == 'true':
            ret = True
        else:
            ret = False
        return ret
    
    
    def func(i,con):
        #print(i)                             #先打印一边i
        con.acquire()                    #锁定
        con.wait_for(condition)      #如果返回true
        print(i+100)                      #打印i+100
        con.release()                     #解锁
    
    c = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=func, args=(i,c,))
        t.start()        
    

      事件型线程锁:

    import threading
    
    def func(i,e):
        print(i)
        e.wait()  #检测是否是什么灯   默认是红灯 所以现在是红灯
        print(i+100)   #绿灯就执行i+100         是绿灯之后直接执行+100
    
    event = threading.Event()    
    
    for i in range(10):
        t =threading.Thread(target=func, args=(i, event,))
        t.start()
    
    event.clear()  #设置成红灯     
    
    inp = input(">>>")
    if inp == "1":
        event.set()  #设置成绿灯
    

    threading.BoundedSemaphore(5)

    锁住线程的数量

    import threading
    import time
    
    NUM = 10
    
    def func(i, l):
        global NUM
        l.acquire()
        NUM -= 1
        time.sleep(2)
        print(NUM)
        #开锁
        l.release()
    
    lock = threading.BoundedSemaphore(5)      #锁5个线程
    
    
    for i in range(10):
        t = threading.Thread(target=func, args = (i, lock, ))
        t.start()
    
    threading.Condition()
    全部锁住,然后根据数量慢慢解锁:
    import threading
    
    def func(i,con):
        print(i)
        con.acquire()
        con.wait()
        print(i+100)
        con.release()
    
    c = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=func, args=(i, c, ))
        t.start()
    
    while True:
        inp = input(">>>")
        if inp  == "q":
            break
        c.acquire()
        c.notify(int(inp)) #如果数量是1 就出现0+100  是2 就是打印出 1+100 和2+100 两个,知道都执行完成
        c.release()
    

    通过时间来锁线程:

    timer:

    from threading import Timer
    
    def hello():
        print("hello world")
    
    t = Timer(0.1,hello)                         #0.1 是多长时间解锁
    t.start()
    

     PS:threading.RLock和threading.Lock的区别

     1 在threading模块中,定义两种类型的锁:threading.Lock和threading.RLock。它们之间有一点细微的区别,通过比较下面两段代码来说明:
     2 
     3     import threading  
     4     lock = threading.Lock() #Lock对象  
     5     lock.acquire()  
     6     lock.acquire()  #产生了死锁。  
     7     lock.release()  
     8     lock.release()  
     9 
    10     import threading  
    11     rLock = threading.RLock()  #RLock对象  
    12     rLock.acquire()  
    13     rLock.acquire() #在同一线程内,程序不会堵塞。  
    14     rLock.release()  
    15     rLock.release()  
    16 
    17 
    18   这两种琐的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。threading.Condition
    19 
    20   可以把Condiftion理解为一把高级的锁,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个锁对象(默认是RLock),可以在创建Condigtion对象的时候把锁对象作为参数传入。Condition也提供了acquire, release方法,其含义与锁的acquire, release方法一致,其实它只是简单的调用内部锁对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用锁(acquire)之后才能调用,否则将会报RuntimeError异常。):
    21 Condition.wait([timeout]):  
    22 
    23   wait方法释放内部所占用的锁,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有锁的时候,程序才会继续执行下去。
    24 Condition.notify():
    25 
    26   唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的锁。
    27 Condition.notify_all()
    28 Condition.notifyAll()
    29 
    30   唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的锁。
    threading.RLock和threading.Lock区别


        **自定义线程池

    import queue
    import threading
    import time
    
    class ThreadPool():
        def __init__(self, maxclient = 5 ):
            self.maxclient = maxclient
            self._q = queue.Queue(maxclient)        #建立队列
            for i in range(maxclient):
                self._q.put(threading.Thread)          #往队列中放一个线程
                #[threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread]
    
        def get_thread(self):
            return self._q.get()                            #取出队列中的一个数据(数据就是一个线程),返回threading.Thread这个类名
    
        def add_thread(self):
            self._q.put(threading.Thread)             #再将一个数据加入队列(数据就是一个线程)
    
    pool = ThreadPool(5)              #创建对象
    
    def task(arg, p):
        print(arg)
        time.sleep(1)
        p.add_thread()              #执行完一个线程,再将令牌还回队列中,就在队列中再加一个线程,可以写成pool.add_thread()
    
    for i in range(100):              #一共100个任务
        t = pool.get_thread()                 # 创建一个线程
        obj = t(target=task, args=(i, pool))   #规定好要执行的函数和参数
        obj.start()                        #开始执行,**一定不能忘
    

      先进先出队列:


        import queue
        q = queue.Queue(10)
        # 队列的最大长度是10
        q.put(11)    
        #put放数据,是否阻塞,阻塞时的超时时间
        q.put(22)
        q.put(33, block = False, timeout = 2)
        # 33是数据,2是超时时间,
        #2秒钟后 如果有位置可以加入队列
        # block = False 不阻塞。
        print(q.qsize())
        #qsize() 真实个数
        #maxsize() 最大个数
        #join, task_done, 阻塞进程,当队列中任务执行完毕之后,不再阻塞
        print(q.get())
        #get是取数据,默认阻塞 阻塞时的超时时间

    import queue
    
    q = queue.Queue(2)                         #限制为2个数据的队列
    print(q.empty())                               #打印队列中是否插入数据了。返回True 或者False
    q.put(111)                                        #插入 111
    q.put(222)                                         #插入222
    print(q.qsize())                                  #显示有几个数据 2个
    q.get()                                              #获取第一个数据
    q.task_done()                                    #获取完毕
    q.get()                                              #获取第二个数据
    q.task_done()                                     #获取完毕
    print(q.qsize())                                    #再看看队列中的数据个数
    
    q.join()                                               主线程等待子线程执行完毕在关闭
    

    其他队列:

    import queue
    
    q = queue.LifoQueue()      #last in  front out = Lifo  后进先出
    q.put(123)                       
    q.put(234)
    print(q.get())
    
    q = queue.PriorityQueue()  # 1 0 6 是权重,数字越小优先级越高  如果权重相同就按照先后顺序
    q.put((1,"1"))
    q.put((0,"2"))
    q.put((6,"3"))
    print(q.get())                     #因此先出 “2”
    
    q = queue.deque()            #双向进出队列
    q.append(123)                  #默认往右加数据 
    q.append(456)
    q.appendleft(100)              #特定从左加    排列就是   456  123  100
    print(q.pop())                    #取第一个数据  456 
    print(q.popleft())                #向左取数据   100
    

    生产者消费者模型(队列)
     做包子

    import queue
    import threading
    import time
    
    q = queue.Queue(20)                          #队列最大是20个数据
    def productor(arg):                              
        '''
        生产者
        :param arg:
        :return:
        '''
        while True:                                        #循环做包子
            q.put(str(arg) + "-包子")                 #做出一个包子
            time.sleep(1)                                  #耗时1秒钟
    
    def consumer(arg):                                #买包子的
        while True:                                        #排队买包子
            print(arg , q.get())      #打印出arg第记号顾客,q.get() 打印出队列中的数据(厨师标号 + "-包子")
            time.sleep(1)              #1秒钟吃完再去排队
    
    for i in range(3):                   #3个厨师做包子
        t = threading.Thread(target = productor, args = (i, ))            
        t.start()
    
    for j in range(20):                 #20个人排队买包子
        t = threading.Thread(target = consumer, args = (j, ))
        t.start()
    

    二 进程
        基本使用

       进程之间是不能互相通讯的 

       

    from multiprocessing import Process
    
    import threading
    
    import time
    def foo(i):
    
        print('say hi',i)
    
    
    if __name__ == "__main__":            #windows 必须增加这行 linux 不需要
        for i in range(10):
            p = Process(target=foo,args=(i,))
            p.start()
    

        进程锁
        进程数据共享
            默认数据不共享
            *queues

    from multiprocessing import Process
    from multiprocessing import queues
    import multiprocessing
    
    def foo(i, arg):
        arg.put(i)
        print("say hi ", i, arg.qsize())
    
    
    if __name__ == "__main__":
        li = queues.Queue(20, ctx=multiprocessing)    #完成进程间的传递信息
        for i in range(10):
            p = Process(target=foo, args=(i, li))
            p.start()
            p.join()
    


            *array 数组格式:

    from multiprocessing import Process
    # from multiprocessing import queues
    import multiprocessing
    from multiprocessing import Array   #数组
    
    def foo(i, arg):
        arg[i] = i + 100
        # print(arg)
        for item in arg:
            print(item)
        print("============")
    
    if __name__ == "__main__":
        li = Array("i", 10)                      #i 是数 int  只能是“i” 数组中只能是数字。
        for i in range(10):
            p = Process(target=foo, args=(i, li,))
            p.start()
    
        'c': ctypes.c_char,  'u': ctypes.c_wchar,
        'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
        'h': ctypes.c_short, 'H': ctypes.c_ushort,
        'i': ctypes.c_int,   'I': ctypes.c_uint,
        'l': ctypes.c_long,  'L': ctypes.c_ulong,
        'f': ctypes.c_float, 'd': ctypes.c_double
    数组类型对应表


            *Manager.dict

    from multiprocessing import Manager
    
    def foo(i, arg):
        arg[i] = i + 100
        print(arg.values())
    
    if __name__ == "__main__":
        obj = Manager()
        li = obj.dict()
        for i in range(10):
            p = Process(target=foo, args=(i,li,))
            p.start()
    
        import time
        time.sleep(1.1)
    

     
            pipe
            
        进程池
            apply-- 串行操作
            apply.async -- 异步进行,并行操作

    from multiprocessing import Pool
    import time
    
    def f1(arg):
        time.sleep(1)
        print(arg)
    
    if __name__ == "__main__":
        pool = Pool(5)
    
        for i in range(30):
            # pool.apply(func=f1, args=(i,))       #串行进行 1个1个执行
            pool.apply_async(func=f1,args=(i,))  #异步进行 ,5个5个执行
    
        pool.close()   #所有的任务执行完毕
        # time.sleep(1)
        # pool.terminate()  #立即终止
        pool.join()
    

       pool.close() #所有的任务执行完毕

       pool.terminate() #立即终止

       
        PS:
            IO 密集型--多线程
            计算密集型-- 多进程
        
        
    三 协程
        原理:利用一个线程,分解一个线程成为多个”微线程“
        greenlet
        gevent
        封装了greenlet
        pip3 install gevent

    from gevent import monkey; monkey.patch_all()
    import gevent
    import requests
    
    def f(url):
        print("GET: %s" % url)
        resp = requests.get(url)            #获取url的信息
        data = resp.text
        print("%d bytes received from %s" % (len(data), url))
    
    gevent.joinall([
        gevent.spawn(f, "https://www.python.org/"),  #是一个列表的形式,f是函数名,后面跟函数的参数
        gevent.spawn(f, "https://www.yahoo.com/"),
        gevent.spawn(f, "https://github.com/"),
    ])
    


    四 缓存
        1,安装软件
        2,程序:安装其对应的模块
            Socket 连接
            memcache
                1 天生集群
                2 基本
                3 gets,cas
                k -> ""

      基本使用方法(通过key获取值)

    import memcache
    
    mc = memcache.Client(['192.168.61.128:12000'], debug=True)  #连接远程memcache服务端
    mc.set("foo", "bar")      #设置key 和value 值
    ret = mc.get('foo')        #获取
    print(ret)                     #打印出 value值
    

      天生支持集群模式:

    import memcache
    
    mc = memcache.Client([('192.168.61.128:12000', 1), ('192.168.61.128:12001', 2), ('192.168.61.128:12002', 1)], debug=True)      
     #每一个服务端,通过元组显示,前面是IP和端口,后面是权重。
    # 权重是1 该服务器就出现一次,2就出现两次 现实就是['192.168.61.128:12000','192.168.61.128:12001','192.168.61.128:12001','192.168.61.128:12002'] mc.set('k1', 'v1')

    #根据算法将 k1 转换成一个数字

    #将数字和主机列表长度求余数,得到一个值 N( 0 <= N < 列表长度 )

    #在主机列表中根据 第2步得到的值为索引获取主机,例如:host_list[N]

    #连接 将第3步中获取的主机,将 k1 = "v1" 放置在该服务器的内存中

       add

         添加相同key的值会报错:

    mc = memcache.Client(['192.168.61.128:12000'], debug=True)
    
    mc.add('k1', 'v2')
    
    # mc.add('k1', 'v2') # 报错,对已经存在的key重复添加,失败!!!
    #MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'

        replace

        替换不存在的key值会报错:

    import memcache
    
    mc = memcache.Client(['192.168.61.128:12000'], debug=True)
    
    # 如果memcache中存在kkkk,则替换成功,否则一场
    
    mc.replace('k2','999')
    
    #k2不存在
    #MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'
    

        set 和 set_multi

          set            设置一个键值对,如果key不存在,则创建,如果key存在,则修改
          set_multi   设置多个键值对,如果key不存在,则创建,如果key存在,则修改

    import memcache
    
    mc = memcache.Client(['192.168.61.128:12000'], debug=True)
    
    mc.set('key0', 'wupeiqi')
    mc.set_multi({'key1': 'val1', 'key2': 'val2'})
    
    print(mc.get("key0"))
    print(mc.get("key1"))
    print(mc.get("key2"))
    

        delete 和 delete_multi

          delete             在Memcached中删除指定的一个键值对
          delete_multi    在Memcached中删除指定的多个键值对

    import memcache
    
    mc = memcache.Client(['192.168.61.128:12000'], debug=True)
    
    mc.delete('key0')
    mc.delete_multi(['key1', 'key2'])   #删除  是列表格式
    
    print(mc.get("key0"))
    print(mc.get("key1"))
    print(mc.get("key2"))
    
    #回复3个none
    

        get 和 get_multi

          get            获取一个键值对
          get_multi   获取多一个键值对

    import memcache
    
    mc = memcache.Client(['192.168.61.128:12000'], debug=True)
    mc.set('key0', 'wupeiqi')
    mc.set_multi({'key1': 'val1', 'key2': 'val2'})     #设置是字典来对应key和value
    
    val = mc.get('key0')
    item_dict = mc.get_multi(["key1", "key2", "key3"])
    
    print(val,item_dict)
    
    ##wupeiqi {'key2': 'val2', 'key1': 'val1'}  key3 不存在就没有
    

        append 和 prepend

          append    修改指定key的值,在该值 后面 追加内容
          prepend   修改指定key的值,在该值 前面 插入内容

    import memcache
    
    mc = memcache.Client(['192.168.61.128:12000'], debug=True)
    # k1 = "v1"
    mc.append('k1', 'after')
    # k1 = "v1after"
    
    mc.prepend('k1', 'before')
    # k1 = "beforev1after"
    
    print(mc.get("k1"))
    

        decr 和 incr  

          incr  自增,将Memcached中的某一个值增加 N ( N默认为1 )
          decr 自减,将Memcached中的某一个值减少 N ( N默认为1 )

    import memcache
    
    
    
    mc = memcache.Client(['192.168.61.128:12000'], debug=True)
    mc.set('k1', '777')
    mc.incr('k1')
    # k1 = 778
    print(mc.get('k1'))
    mc.incr('k1', 10)
    print(mc.get('k1'))
    # k1 = 788
    mc.decr('k1')
    print(mc.get('k1'))
    # k1 = 787
    mc.decr('k1', 10)
    print(mc.get('k1'))
    # k1 = 777
    

        gets 和 cas

          避免脏数据,有两个客户同时更改同一个key的value 导致数据不准确。

          解决办法:每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值      进行比较,如果相等,则可以提交,如果不想等,那表示在gets和cas执行之间,又有其他人执行了gets(获取了缓冲的指定值), 如此一来有可能出      现非正常数据,则不允许修改。

    import memcache
    
    mc = memcache.Client(['192.168.61.128:12000'], debug=True, cache_cas=True)
    v = mc.gets('product_count')
    # ...
    # 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
    mc.cas('product_count', "899")
    


            redis
                k - >""
                k-> [11,222,33]
                k -> {"k1":xxx}
                k -> {11,22}
                k-> [(11,1),(13,2)]

      

        基本使用方法:

    import redis
    
    r = redis.Redis(host='192.168.61.128', port=6379)
    
    r.set('foo', 'Bar')
    
    print(r.get('foo'))
    
    #b'Bar'
    

       

        

        连接池

          redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连      接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

    import redis
    
    pool = redis.ConnectionPool(host='192.168.61.128', port=6379)
    
    r = redis.Redis(connection_pool=pool)
    r.set('foo', 'Bar')
    print(r.get('foo'))
    

          

        

  • 相关阅读:
    HTML DOM Select add() 方法
    HTML DOM hasAttributes 方法
    Bootstrap4 表单
    HTML DOM Source 对象
    Java中,分布式session存储解决方案
    string.padStart (String) – JavaScript 中文开发手册
    数组模拟栈
    栈 Stack
    数据库的备份和还原
    数据库设计 — 范式
  • 原文地址:https://www.cnblogs.com/aaron-shen/p/5681202.html
Copyright © 2011-2022 走看看