zoukankan      html  css  js  c++  java
  • python多线程

    1 多进程

    # 多进程,
    import os
    import time
    from multiprocessing import Process
    # 启动时必须在 if __name__ 判断下,windows 必须,其他 无限制
    # =================================================
    # def func(args):
    #     print("子进程:",os.getpid())
    #     print("子进程的父进程:",os.getppid())
    #     time.sleep(10)
    #     print("子进程结束")
    # if __name__ =="__main__":
    #     p = Process(target=func,args=(1,)) # 注册 并传入元祖 元祖有一个参数要加逗号
    #     # p是进程对象
    #     p.start() # 开启子进程
    #     print("主进程:", os.getpid())
    #     print("主进程的父进程:",os.getppid()) # cmd 或者是 pycharm
    # 生命周期
    #   主进程长:自己执行完结束
    #   子进程长:等待子进程结束
    # =================================================
    # 多进程中的方法
    # join
    # def fun(arg1,arg2):
    #     print('*'*arg1)
    #     # time.sleep(5)
    #     print('*'*arg2)
    # if __name__ == "__main__":
    #     p = Process(target=fun,args=(10,20))
    #     p.start()
    #     # p.join() # 感知子进程结束
    #     # time.sleep(1)
    #     print("all is stop")
    #
    #     print("最后的语句")
    #     os.walk(r"目录") # 返回 文件夹中文件名字
    # =================================================
    # def fun():
    #     print("xxx")
    # if __name__ == "__main__":
    #     for i in range(10):
    #         p = Process(target=fun)
    #         p.start()
    #         p.join() # 停止for循环 进程结束后继续
    #         print("for")
    #     print("主进程")
    # =================================================
    # 第二种方法
    # class MyProcess(Process):
    #     def __init__(self,args):
    #         super().__init__() # 若要传递参数,需要调用父类init
    #
    #     def run(self):
    #         print("子进程",self.__dict__)
    #         print(self.pid)
    # if __name__ == "__main__":
    #     print("主进程:",os.getpid())
    #     p1 = MyProcess()
    #     p1.start()
    # =================================================
    # 进程之间数据是隔离,命名空间不通
    # def fun():
    #     global n
    #     n= 0
    #     print("pid:%s" %os.getpid(),n)
    # if __name__ == "__main__":
    #     n=100
    #     p = Process(target=fun)
    #     p.start()
    #     p.join()
    #     print(n) # -->100
    # =================================================
    # 多进程tcp连接
    # import socket
    # # 客户端
    # sk = socket.socket()
    # sk.connect(("127.0.0.1",8080))
    # sk.send('N好'.encode("utf8"))
    # msg = sk.recv(1024).decode("utf8")
    # print(msg)
    # sk.close()
    #
    # # 服务端
    # def server(conn):
    #     ret= "你好".encode("utf8")
    #     conn.send(ret)
    #     msg = conn.recv(1024).decode("utf8")
    #     print(msg)
    #     conn.close()
    #
    # sk = socket.socket()
    # sk.bind(("127.0.0.1",8080))
    # sk.listen()
    # if __name__ == "__main__":
    #     while True:
    #         conn, addr = sk.accept()
    #         p = Process(target=server,args=(conn,))
    #         p.start()
    # =================================================
    # 守护进程
    # 默认情况 父进程 等待子进程结束
    # p.daemon = True 在start前,设置为守护进程,守护进程随父进程(代码执行完毕)结束
    #   若父进程在等待 子进程(非守护进程时) ,若父进程代码完毕,守护进程应该结束
    # p.is_alive() 判断进程是否存活
    # p.terminate() 终止进程
    # =================================================
    # 锁
    # 未加锁实例:
    # 火车票
    import json
    import time
    from multiprocessing import Process
    # def show(i):
    #     with open('ticket') as f:
    #         dic = json.load(f)
    #     print('余票: %s'%dic['ticket'])

    def buy_ticket(i):
       with open('ticket') as f:
           dic = json.load(f)
           time.sleep(0.1)
       if dic['ticket'] > 0 :
           dic['ticket'] -= 1
           print('33[32m%s买到票了33[0m'%i)
       else:
           print('33[31m%s没买到票33[0m'%i)
       time.sleep(0.1)
       with open('ticket','w') as f:
           json.dump(dic,f)
    if __name__ == '__main__':
       # for i in range(10):
       #     p = Process(target=show,args=(i,))
       #     p.start()
       for i in range(10):
           p = Process(target=buy_ticket, args=(i))
           p.start()
    # =================================================
    # 锁
    # 加锁实例
    # 火车票
    import json
    import time
    from multiprocessing import Process
    from multiprocessing import Lock

    # def show(i):
    #     with open('ticket') as f:
    #         dic = json.load(f)
    #     print('余票: %s'%dic['ticket'])

    def buy_ticket(i,lock):
       lock.acquire() #拿钥匙进门
       with open('ticket') as f:
           dic = json.load(f)
           time.sleep(0.1)
       if dic['ticket'] > 0 :
           dic['ticket'] -= 1
           print('33[32m%s买到票了33[0m'%i)
       else:
           print('33[31m%s没买到票33[0m'%i)
       time.sleep(0.1)
       with open('ticket','w') as f:
           json.dump(dic,f)
       lock.release()      # 还钥匙

    if __name__ == '__main__':
       # for i in range(10):
       #     p = Process(target=show,args=(i,))
       #     p.start()
       lock = Lock()
       for i in range(10):
           p = Process(target=buy_ticket, args=(i,lock))
           p.start()
    # =================================================
    # =================================================
    # =================================================

    2 信号量_事件

    # 多进程中的组件
    # 一个资源 同一时间 被n个人访问
    import time
    import random
    from multiprocessing import Process,Event
    # ==============================
    # 未用信号量
    # def ktv(i):
    #     print('%s走进ktv'%i)
    #     time.sleep(random.randint(1,5))
    #     print('%s走出ktv'%i)
    # if __name__ == '__main__' :
    #     for i in range(20):
    #         p = Process(target=ktv,args=(i))
    #         p.start()
    # ==============================
    from multiprocessing import Semaphore

    # sem = Semaphore(4)
    # sem.acquire()
    # print('拿到第一把钥匙')
    # sem.acquire()
    # print('拿到第二把钥匙')
    # sem.acquire()
    # print('拿到第三把钥匙')
    # sem.acquire()
    # print('拿到第四把钥匙')
    # sem.acquire()
    # print('拿到第五把钥匙')
    # def ktv(i,sem):
    #     sem.acquire()   #获取钥匙
    #     print('%s走进ktv'%i)
    #     time.sleep(random.randint(1,5))
    #     print('%s走出ktv'%i)
    #     sem.release()   # 释放钥匙
    #
    #
    # if __name__ == '__main__' :
    #     sem = Semaphore(4)
    #     for i in range(20):
    #         p = Process(target=ktv,args=(i,sem))
    #         p.start()

    # ==============================
    # 事件
    #   信号是控制进程阻塞与否
    #   事件创建后,默认是阻塞状态
    # e = Event() # 创建事件
    # e.is_set() # False 默认阻塞
    # print("xx") # 可打印 e.set() 设置为True e.clear() 设置为False
    # e.wait()
    # print("xx") # 阻塞
    # 遇到wait()会判断is_set() 为False 阻塞
    # ==============================
    # 红绿灯事件
    def cars(e,i):
       if not e.is_set():
           print("car%i在等待" % i)
           e.wait()
       print("car%i通过" % i)


    def light(e):
       while True:
           if e.is_set():
               e.clear()
               print("33[31m红灯33[0m")
           else:
               e.set()
               print("33[32m绿灯33[0m")
           time.sleep(2)
    if __name__ == "__main__":
       e = Event()
       p =Process(target=light,args=(e,))
       p.start()
       for i in range(1,21):
           car = Process(target=cars,args=(e,i))
           car.start()
           time.sleep(random.random())
    # ==============================
    # ==============================
    # ==============================
    # ==============================
    # def test(e):
    #     e.set()
    #     print("xxx")
    # if __name__=="__main__":
    #     e = Event()
    #     print(e.is_set())
    #     p = Process(target=test,args=(e,))
    #     p.start()
    #     e.wait()
    #     print("注")

    3 进程通信_队列管道

    # IPC 内部进程通信,不能使用普通queue
    from multiprocessing import Queue,Process
    # ===============================
    # q = Queue(5) # 队列大小
    # q.put(1)
    # q.put(1)
    # q.full() # 若队列满了,阻塞等待
    # q.get()
    # q.empty() # 若为空,阻塞等待有数据 后取值
    # q.get()
    # q.get_nowait() # 用于跳过等待,需要用try
    # ===============================
    # def produce(q):
    #     q.put('hello')
    # def consume(q):
    #     print(q.get())
    # if __name__ =="__main__":
    #     q = Queue()
    #     p = Process(target=produce,args=(q,))
    #     p.start()
    #     p2 = Process(target=consume, args=(q,))
    #     p2.start()
    # ===============================
    # 生产者消费者模型
    # 若生产者,生产有数量,消费者,不停消费,最后消费进程会处于等待状态
    # 可在主进程后边join生产进程,消费进程判断为空,但不准确
    #   需要在队列put(None) 子进程判断,由于数据之间不能共享,需要put 消费数量的None
    # ===============================
    from multiprocessing import JoinableQueue
    # consume :
    #     ....
    #     q.task_done()
    # produce :
    #     ...
    #     q.join()
    # ===============================
    # 循环通知,致使进程结束
    # JoinableQueue
    # 生产者生产,不停不停消费,若q为空 一直等待,
    #   生产者完毕后,会join等待消费值消费完毕,因为是同一个q,一个生产者完毕后,其他还没有完毕q会处于,他会处于阻塞
    #   等待 消费者 全部消费完毕,q.join()会感知,因此 生产进程会结束,主进程最后join生产进程,生产结束
    #   主进程就结束,身为守护进程的子进程也结束
    # import time
    # import random
    # from multiprocessing import Process,JoinableQueue
    # def consumer(q,name):
    #     while True:
    #         food = q.get()
    #         print('33[31m%s消费了%s33[0m' % (name,food))
    #         time.sleep(random.randint(1,3))
    #         q.task_done()     # count - 1
    #
    # def producer(name,food,q):
    #     for i in range(4):
    #         time.sleep(random.randint(1,3))
    #         f = '%s生产了%s%s'%(name,food,i)
    #         print(f)
    #         q.put(f)
    #     q.join()   # 阻塞 直到一个队列中的所有数据 全部被处理完毕
    #
    # if __name__ == '__main__':
    #     q = JoinableQueue(20)
    #     p1 = Process(target=producer,args=('Egon','包子',q))
    #     p2 = Process(target=producer, args=('wusir','泔水', q))
    #     c1 = Process(target=consumer, args=(q,'alex'))
    #     c2 = Process(target=consumer, args=(q,'jinboss'))
    #     p1.start()
    #     p2.start()
    #     c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
    #     c2.daemon = True
    #     c1.start()
    #     c2.start()
    #     p1.join()
    #     p2.join()     # 感知一个进程的结束

    # 在消费者这一端:
       # 每次获取一个数据
       # 处理一个数据
       # 发送一个记号 : 标志一个数据被处理成功

    # 在生产者这一端:
       # 每一次生产一个数据,
       # 且每一次生产的数据都放在队列中
       # 在队列中刻上一个记号
       # 当生产者全部生产完毕之后,
       # join信号 : 已经停止生产数据了
                   # 且要等待之前被刻上的记号都被消费完
                   # 当数据都被处理完时,join阻塞结束

    # consumer 中把所有的任务消耗完
    # producer 端 的 join感知到,停止阻塞
    # 所有的producer进程结束
    # 主进程中的p.join结束
    # 主进程中代码结束
    # 守护进程(消费者的进程)结束
    # ===============================
    # 管道 双向通信工具
    from multiprocessing import Pipe
    # conn,conn2 = Pipe()
    # conn.send("123456") # 不用字节
    # print(conn2.recv()) # 不用指定大小

    def fun(conn):
       conn.send("hello")
    if __name__=="__main__":
       conn1,conn2 = Pipe()
       Process(target=fun,args=(conn1,)).start()
       print(conn1.recv())
    # 管道返回2个连接
    #   conn1, conn2
    # P   发送 接受
    # p2   接受 发送
    # 若只发数据,可关闭一端,若取数据的时候,对面已关闭,则报错,看根据此 终止程序
    from multiprocessing import Pipe,Process
    # def func(conn1,conn2):
    #     conn2.close()
    #     while True:
    #         try :
    #             msg = conn1.recv()
    #             print(msg)
    #         except EOFError:
    #             conn1.close()
    #             break
    #
    # if __name__ == '__main__':
    #     conn1, conn2 = Pipe()
    #     Process(target=func,args = (conn1,conn2)).start()
    #     conn1.close()
    #     for i in range(20):
    #         conn2.send('吃了么')
    #     conn2.close()
    # ===============================
    # from multiprocessing import Lock,Pipe,Process
    # def producer(con,pro,name,food):
    #     con.close()
    #     for i in range(100):
    #         f = '%s生产%s%s'%(name,food,i)
    #         print(f)
    #         pro.send(f)
    #     pro.send(None)
    #     pro.send(None)
    #     pro.send(None)
    #     pro.close()
    #
    # def consumer(con,pro,name,lock):
    #     pro.close()
    #     while True:
    #             lock.acquire() # 不安全主要是recv的时候,因此两端加锁即可
    #             food = con.recv()
    #             lock.release()
    #             if food is None:
    #                 con.close()
    #                 break
    #             print('%s吃了%s' % (name, food))
    # if __name__ == '__main__':
    #     con,pro = Pipe()
    #     lock= Lock()
    #     p = Process(target=producer,args=(con,pro,'egon','泔水'))
    #     c1 = Process(target=consumer, args=(con, pro, 'alex',lock))
    #     c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock))
    #     c3 = Process(target=consumer, args=(con, pro, 'wusir',lock))
    #     c1.start()
    #     c2.start()
    #     c3.start()
    #     p.start()
    #     con.close()
    #     pro.close()

    # from multiprocessing import Process,Pipe,Lock
    #
    # def consumer(produce, consume,name,lock):
    #     produce.close()
    #     while True:
    #         lock.acquire()
    #         baozi=consume.recv()
    #         lock.release()
    #         if baozi:
    #             print('%s 收到包子:%s' %(name,baozi))
    #         else:
    #             consume.close()
    #             break
    #
    # def producer(produce, consume,n):
    #     consume.close()
    #     for i in range(n):
    #         produce.send(i)
    #     produce.send(None)
    #     produce.send(None)
    #     produce.close()
    #
    # if __name__ == '__main__':
    #     produce,consume=Pipe()
    #     lock = Lock()
    #     c1=Process(target=consumer,args=(produce,consume,'c1',lock))
    #     c2=Process(target=consumer,args=(produce,consume,'c2',lock))
    #     p1=Process(target=producer,args=(produce,consume,30))
    #     c1.start()
    #     c2.start()
    #     p1.start()
    #     produce.close()
    #     consume.close()

    # pipe 数据不安全性
    # IPC
    # 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象

    # 队列 进程之间数据安全的
    #   管道 + 锁

    3_进程通信__数据共享

    from multiprocessing import Manager,Process,Lock
    # 进程之间不能传递数据,通过方法的参数可以传过去,但修改后,无反应,不知道原因
    def main(dict,lock):
       lock.acquire()
       dict['count']-=1
       lock.release()
    if __name__ == "__main__":
       m = Manager()
       l = Lock()
       dict = m.dict({"count":100})
       p_list=[]
       for i in range(50):
           p = Process(target=main,args=(dict,l))
           p.start()
           p_list.append(p)
       for i in p_list:i.join()
       print(dict['count'])

    4_进程池

    # 进程池
    # 上一个例子中50个进程很慢
    #   寄存器 堆栈 文件
    #   操作系统调度,cup切换
    # 高级线程池有数量限定 ,有最低,任务变多的时候 逐步加到最高限制
    import os,time
    from multiprocessing import Pool,Manager
    # ==================================
    # def func2(i):
    #     print(os.getpid(),os.getppid())
    #     i+1
    # def func(list):
    #     list[1]['set'].add(os.getpid())
    #     print(len(list[1]['set']))
    # # 一般超过5个使用pool
    # if __name__ == "__main__":
    #     pid = Manager()
    #     dict1 = pid.dict({"set":set()})
    #     pool = Pool(5)
    #     # 执行不同的任务,map 自带join方法
    #     #pool.map(func2,range(100))
    #     pool.map(func, [[i,dict1]for i in range(100)])
    #     print(len(dict1['set']))
    # """
    # 等于
    #     for i in range(100):
    #         p = Process(target=func,args=(i,))
    #         p.start()
    # """
    # ==================================
    # def fun(n):
    #     print("start fun%s" %n,os.getpid())
    #     time.sleep(1)
    #     print("end fun%s" % n, os.getpid())
    # if __name__ == "__main__":
    #     p = Pool() # 默认cup核心数量
    #     for i in range(10):
    #         #p.apply(fun,args=(i,)) # 同步提交的
    #         p.apply_async(fun,args=(i,)) # 异步提交,真的异步,因此需要join
    #     p.close() # 不再接受新的任务
    #     p.join() # 感知进程池中任务结束 保持 主进程 与子进程同步

    # ==================================
    # import socket
    # from multiprocessing import Pool
    #
    # def func(conn):
    #     conn.send(b'hello')
    #     print(conn.recv(1024).decode('utf-8'))
    #     conn.close()
    #
    # if __name__ == '__main__':
    #     p = Pool(5)
    #     sk = socket.socket()
    #     sk.bind(('127.0.0.1',8080))
    #     sk.listen()
    #     while True:
    #         conn, addr = sk.accept()
    #         p.apply_async(func,args=(conn,))
    #     sk.close()
    #     import socket
    #
    #     sk = socket.socket()
    #     sk.connect(('127.0.0.1', 8080))
    #
    #     ret = sk.recv(1024).decode('utf-8')
    #     print(ret)
    #     msg = input('>>>').encode('utf-8')
    #     sk.send(msg)
    #     sk.close()
    # ==================================
    # 进程池的返回值
    # p = Pool()
    # p.map(funcname,iterable)     默认异步的执行任务,且自带close和join
    # p.apply   同步调用的
    # p.apply_async 异步调用 和主进程完全异步 需要手动close 和 join
    # from multiprocessing import Pool
    # def func(i):
    #     return i*i
    #
    # if __name__ == '__main__':
    #     p = Pool(5)
    #     for i in range(10):
    #         res = p.apply(func,args=(i,))   # apply的结果就是func的返回值
    #         print(res) --> 直接就是返回值
    # ==================================
    # import time
    # from multiprocessing import Pool
    # def func(i):
    #     time.sleep(0.5)
    #     return i*i
    #
    # if __name__ == '__main__':
    #     p = Pool(5)
    #     res_l = []
    #     for i in range(10):
    #         res = p.apply_async(func,args=(i,))   # apply的结果就是func的返回值
    #         res_l.append(res)
    #   若在for 中直接获取res.get()会在成阻塞,程序变同步执行
    #     for res in res_l:print(res.get())# 等着 func的计算结果
    #   调用res.get时返回
    # ==================================
    # import time
    # from multiprocessing import Pool
    # def func(i):
    #     time.sleep(0.5)
    #     return i*i
    #
    # if __name__ == '__main__':
    #     p = Pool(5)
    #     ret = p.map(func,range(100))
    #     print(ret) # -> 直接返回全部,列表返回
    # 自带join,close 最后一起返回
    # ====================================
    # 回调函数 , 回调的函数在主进程调用
    # 对于子进程中再起子进程问题,还不知道
    # 每个进程的回调函数 交给主进程顺序执行
    import os
    from multiprocessing import Pool,Process
    def func2(nn):
       print('in func2',os.getpid())
       print(nn)
    def func3(n):
       print('in func3', os.getpid())
       return n*n
    def func1(n):
       print('in func1',os.getpid())
       p = Pool(5)
       p.apply_async(func3, args=(10,), callback=func2)
       p.close()
       p.join()
       return n*n
    if __name__ == '__main__':
       print('主进程 :',os.getpid())
       p = Pool(5)
       p.apply_async(func1,args=(10,),callback=func2)
       p.close()
       p.join()
    # ===================================================
    import requests
    from urllib.request import urlopen
    from multiprocessing import Pool
    # 200 网页正常的返回
    # 404 网页找不到
    # 502 504
    # 场景:callback 耗时段,远小于网络延时,此时使用,在主进程运行,
    def get(url):
       response = requests.get(url)
       if response.status_code == 200:
           return url, response.content.decode('utf-8')


    def get_urllib(url):
       ret = urlopen(url)
       return ret.read().decode('utf-8')


    def call_back(args):
       url, content = args
       print(url, len(content))


    if __name__ == '__main__':
       url_lst = [
           'https://www.cnblogs.com/',
           'http://www.baidu.com',
           'https://www.sogou.com/',
           'http://www.sohu.com/',
      ]
       p = Pool(5)
       for url in url_lst:
           p.apply_async(get, args=(url,), callback=call_back) # callback 中的参数为 get函数的返回值
       p.close()
       p.join()

    4_线程

     

     

     

    两者之间应该有对应关系1:1 1:n

     

     

    linux 中的nptl 1对1 线程

     

     

    # 同一进程的线程间的数据共享的,共享的 共享的
    #   可通过直接访问全局变量 global,还需要进程同步
    #   创建,切换,撤销 相比进程 消耗小,轻量级
    #   进程:资源分配单位,每个进程 至少一个线程
    #   线程:cup调度单位
    # thread 基本模块,避免使用,可能与threading 冲突
    # threading thread的高级版本
    # Queue 多线程之间共享数据的数据结构
    # 与进程类似,好多方法相同
    import time
    from threading import Thread
    import threading
    # def func(n):
    #     time.sleep(1)
    #     print(n)
    # t = Thread(target=func,args=(12,))
    # t.daemon = True # 成为"守护线程"
    # t.start()
    # print("主线程") # 默认情况等待子线程结束
    # ===================================
    # class MyThread(Thread):
    #     def __init__(self,name):
    #         super().__init__()
    #         self.name = name
    #     def run(self):
    #         # time.sleep(1)
    #         print(self.name)
    # MyThread("段志方").start()
    # ================================
    # GIL 锁的是线程,同一时间 只有一个线程 ,cpython解释器的问题,jpython 就不会
    # 对于io密集型 没什么区别,只要io时会切换即可
    # 但对于多核cup python 同时只能运行一个cup ,其他语言的会运行多个,因此...
    # 即不能通过物理核心数增加速度,不能实现(并行)
    # ============================================
    # 多线程socket 可以input
    # import socket
    # from threading import Thread
    # def chat(conn):
    #     conn.send(b'hello')
    #     msg = conn.recv(1024).decode('utf-8')
    #     print(msg)
    #     conn.close()
    # sk = socket.socket()
    # sk.bind(('127.0.0.1',8080))
    # sk.listen()
    # while True:
    #     conn,addr = sk.accept()
    #     Thread(target=chat,args = (conn,)).start()
    # sk.close()
    #
    # import socket
    # sk = socket.socket()
    # sk.connect(('127.0.0.1',8080))
    # msg = sk.recv(1024)
    # print(msg)
    # inp = input('>>> ').encode('utf-8')
    # sk.send(inp)
    # sk.close()
    # =========================
    # print(threading.current_thread()) # 当前线程
    # print(threading.active_count()) # 全部线程,包括主线程
    # print(threading.enumerate()) # 列表返回全部线程对象
    # ==========================================
    # 守护线程
    # import time
    # from threading import Thread
    # def func1():
    #     while True:
    #         print('*'*10)
    #         time.sleep(1)
    # def func2():
    #     print('in func2')
    #     time.sleep(5)
    #
    # t = Thread(target=func1,)
    # t.daemon = True
    # t.start()
    # t2 = Thread(target=func2,)
    # t2.start()
    # t2.join()
    # print('主线程')

    # (守护进程)随着(主进程代码)的执行结束而结束
    # 守护(线程)会在主线程结束之后等待(其他非守护子线程)的结束才结束

    # 主进程在执行完自己的代码之后不会立即结束 而是等待子进程结束之后 回收子进程的资源
    # import time
    # from multiprocessing import Process
    # def func():
    #     time.sleep(5)
    #
    # if __name__ == '__main__':
    #         Process(target=func).start()
    # =========================================
    # 线程锁 ,与gil无关
    import time
    from threading import Lock,Thread
    # Lock 互斥锁
    # def func(lock):
    #     global n
    #     lock.acquire()
    #     temp = n
    #     time.sleep(0.2)
    #     n = temp - 1
    #     lock.release()
    #
    # n = 10
    # t_lst = []
    # lock = Lock()
    # for i in range(10):
    #     t = Thread(target=func,args=(lock,))
    #     t.start()
    #     t_lst.append(t)

    # for t in t_lst: t.join()
    # print(n)

    # 科学家吃面   还会死锁

    # noodle_lock = Lock()
    # fork_lock = Lock()
    # def eat1(name):
    #     noodle_lock.acquire()
    #     print('%s拿到面条啦'%name)
    #     fork_lock.acquire()
    #     print('%s拿到叉子了'%name)
    #     print('%s吃面'%name)
    #     fork_lock.release()
    #     noodle_lock.release()
    #
    # def eat2(name):
    #     fork_lock.acquire()
    #     print('%s拿到叉子了'%name)
    #     time.sleep(1)
    #     noodle_lock.acquire()
    #     print('%s拿到面条啦'%name)
    #     print('%s吃面'%name)
    #     noodle_lock.release()
    #     fork_lock.release()
    #
    # Thread(target=eat1,args=('alex',)).start()
    # Thread(target=eat2,args=('Egon',)).start()
    # Thread(target=eat1,args=('bossjin',)).start()
    # Thread(target=eat2,args=('nezha',)).start()
    # ===============================================
    from threading import RLock   # 递归锁
    fork_lock = noodle_lock = RLock()
    # 一个钥匙串上的两把钥匙,同一个lock 在一个线程中可又多次acquire
    # 传给其他线程时 不能被acquire
    # def eat1(name):
    #     print(name)
    #     noodle_lock.acquire()           # 一把钥匙
    #     print('%s拿到面条啦'%name)
    #     fork_lock.acquire()
    #     print('%s拿到叉子了'%name)
    #     print('%s吃面'%name)
    #     fork_lock.release()
    #     noodle_lock.release()
    #
    # def eat2(name):
    #     print(name)
    #     fork_lock.acquire()
    #     print('%s拿到叉子了'%name)
    #     time.sleep(1)
    #     noodle_lock.acquire()
    #     print('%s拿到面条啦'%name)
    #     print('%s吃面'%name)
    #     noodle_lock.release()
    #     fork_lock.release()
    # Thread(target=eat1,args=('alex',)).start()
    # Thread(target=eat2,args=('Egon',)).start()
    # Thread(target=eat1,args=('bossjin',)).start()
    # Thread(target=eat2,args=('nezha',)).start()
    # =================================================

    5_线程_信号量_事件_条件_定时器_列队_线程池

    import time
    from threading import Semaphore,Thread
    # ====================================
    # def func(sem,a,b):
    #     sem.acquire()
    #     time.sleep(1)
    #     print(a+b)
    #     sem.release()
    # sem = Semaphore(4)
    # for i in range(10):
    #     t = Thread(target=func,args=(sem,i,i+5))
    #     t.start()
    # ====================================
    # 事件被创建的时候
    # False状态
       # wait() 阻塞
    # True状态
       # wait() 非阻塞
    # clear 设置状态为False
    # set 设置状态为True
    # 数据库 - 文件夹
    # 文件夹里有好多excel表格
       # 1.能够更方便的对数据进行增删改查
       # 2.安全访问的机制
    # 起两个线程
    # 第一个线程 : 连接数据库
           # 等待一个信号 告诉我我们之间的网络是通的
           # 连接数据库
    # 第二个线程 : 检测与数据库之间的网络是否连通
           # time.sleep(0,2) 2
           # 将事件的状态设置为True
    # import time
    # import random
    # from threading import Thread,Event
    # def connect_db(e):
    #     count = 0
    #     while count < 3:
    #         e.wait(0.5)   # 状态为False的时候,我只等待1s就结束
    #         if e.is_set() == True:
    #             print('连接数据库')
    #             break
    #         else:
    #             count += 1
    #             print('第%s次连接失败'%count)
    #     else:
    #         raise TimeoutError('数据库连接超时')
    # def check_web(e):
    #     time.sleep(random.randint(0,3))
    #     e.set()
    # e = Event()
    # t1 = Thread(target=connect_db,args=(e,))
    # t2 = Thread(target=check_web,args=(e,))
    # t1.start()
    # t2.start()
    # ====================================
    # 条件 复杂的锁
    # 条件
    from threading import Condition
    # 条件
    # 锁
    # acquire release
    # 一个条件被创建之初 默认有一个(False)状态
    # False状态 会影响wait一直处于等待状态
    # notify(int数据类型) 造钥匙
    # from threading import Thread,Condition
    # def func(con,i):
    #     con.acquire()
    #     con.wait() # 等钥匙
    #     print('在第%s个循环里'%i)
    #     con.release()
    # con = Condition()
    # for i in range(10):
    #     Thread(target=func,args = (con,i)).start()
    # while True:
    #     num = int(input('>>>'))
    #     con.acquire()
    #     con.notify(num) # 造钥匙
    #     con.release()
    # ====================================
    #定时器
    # import time
    # from threading import Timer
    # def func():
    #     print('时间同步')   #1-3
    # while True:
    #     t = Timer(5,func).start()   # 非阻塞的 ,异步的 ,会把所有的5s在一起
    #     time.sleep(5) # 睡5s 每5s进行意思时间同步
    # ====================================
    # 加锁 麻烦 所以使用队列
    #线程通信
    # queue
    # import queue #直接导入普通queue 是线程安全的
    # q = queue.Queue() # 队列 先进先出
    # q.put()
    # q.get()
    # q.put_nowait()
    # q.get_nowait()
    # q = queue.LifoQueue() # 栈 先进后出
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # print(q.get())
    # print(q.get())
    # q = queue.PriorityQueue() # 优先级队列
    # q.put((1,'a'))
    # q.put((10,'b'))
    # q.put((30,'c'))
    # q.put((1,'d'))
    # q.put((1,'f'))
    # print(q.get())
    # 元祖中的元素按顺序比较,数字越小优先级大,祖父按照ascii越小优先级越大
    # ====================================
    # 线程池
    import time
    # 以前没有线程池
    from concurrent.futures import ThreadPoolExecutor
    # ProcessPoolExecutor 该模块下还有一个进程池,与multi 功能相同
    # submit(fn,*args,**kwargs) 异步提交任务
    # map(fun,*iterables,timeout=None,chunksize - 1) 循环的submit
    # shutdown(wait=True) # 等于原来的 close join 合并
    # result(time=None) 取得结果
    # add_done_callback(fn) 回调函数
    def func(n):
       time.sleep(2)
       print(n)
       return n*n
    def call_back(m):
       print('结果是 %s'%m.result())
    # 若使用进程池 只换ThreadPoolExecutor->ProcessPoolExecutor
    tpool = ThreadPoolExecutor(max_workers=5)   # 默认 不要超过cpu个数*5
    for i in range(20):
       tpool.submit(func,i).add_done_callback(call_back)
    tpool.shutdown()
    # tpool.map(func,range(20)) # 拿不到返回值

    # t_lst = []
    # for i in range(20):
    #     t = tpool.submit(func,i)
    #     t_lst.append(t)
    # tpool.shutdown() # close+join   #
    # print('主线程')
    # for t in t_lst:print('***',t.result()) # 拿返回值

    6_协程

     

    # 进程 多个进程,操作系统负责
    # 线程 不能同一时间多个cup 其他语言可以,但不影响高io
    #   开启线程 创建线程 寄存器 堆栈
    #   关闭一个线程
    # 协程
    #   本质是一个线程
    #   能够在多个任务间切换,不需要寄存器,堆栈切换
    #   任务之间切换时间开销 远小于线程
    #   计算任务之间切换消耗也很大,一般都是遇到io的时候切换
    #   进程(cup数+1)+线程(cup数*5)+协程(500) = 50000
    #   适合爬虫
    # 实现并发的手段
    # import time
    # 实现在 con,pro之间来回切换
    # def consumer():
    #     while True:
    #         x = yield
    #         time.sleep(1)
    #         print('处理了数据 :',x)
    #
    # def producer():
    #     c = consumer()
    #     next(c)
    #     for i in range(10):
    #         time.sleep(1)
    #         print('生产了数据 :',i)
    #         c.send(i)
    #
    # producer()
    # =============================================
    # 真正的协程模块就是使用greenlet完成的切换
    from greenlet import greenlet
    # def eat():
    #     print('eating start')
    #     g2.switch()
    #     print('eating end')
    #     g2.switch()
    #
    # def play():
    #     print('playing start')
    #     g1.switch()
    #     print('playing end')
    # g1 = greenlet(eat) # 必须先有g1 ,g2 函数中才能使用g
    # g2 = greenlet(play) # 不会自动切换
    # g1.switch()
    # ======================================
    # 不能感知time.sleep(1)
    # 可以感知gevent.sleep(1),在第一行引入 如下from...
    # 后边的time 都会经过特殊处理,time.sleep() 就可以被识别
    # from gevent import monkey;monkey.patch_all()
    # import time
    # import gevent
    # import threading
    # def eat():
    #     DummyThread-1 虚拟的线程
    #     print(threading.current_thread().getName())
    #     print(threading.current_thread())
    #     print('eating start')
    #     time.sleep(1)
    #     print('eating end')
    #
    # def play():
    #     DummyThread-2 虚拟的线程
    #     print(threading.current_thread().getName())
    #     print(threading.current_thread())
    #     print('playing start')
    #     time.sleep(1)
    #     print('playing end')
    #
    # g1 = gevent.spawn(eat) # 注册进入,会自动切换,不是操作系统调度
    # g2 = gevent.spawn(play) # gevent 负责协程的调度 通过封装的greenlet switch
    # g1.join() gevent 是完全异步的 join等待协程结束
    # g2.join()
    # 进程和线程的任务切换右操作系统完成
    # 协程任务之间的切换由程序(代码)完成,只有遇到协程模块能识别的IO操作,(时间片等不识别)的时候,程序才会进行任务切换,实现并发的效果
    # ========================================
    # 同步 和 异步
    # from gevent import monkey;monkey.patch_all() # 放最前面
    # import time
    # import gevent
    # def task(n):
    #     time.sleep(1)
    #     print(n)
    # def sync(): # 同步
    #     for i in range(10):
    #         task(i)
    # def async(): # 异步
    #     g_lst = []
    #     for i in range(10):
    #         g = gevent.spawn(task,i)
    #         g_lst.append(g)
    #     gevent.joinall(g_lst)   #两种方法都可
    #     for g in g_lst:g.join()
    # ======================================
    # 协程 : 能够在一个线程中实现并发效果的概念
       #   能够规避一些任务中的IO操作
       #   在任务的执行过程中,检测到IO就切换到其他任务

    # 多线程 被弱化了
    # 协程 在一个线程上 提高CPU 的利用率
    # 协程相比于多线程的优势 切换的效率更快
    # ==========================================
    # 爬虫的例子
    # 请求过程中的IO等待
    # from gevent import monkey;monkey.patch_all()
    # import gevent
    # from urllib.request import urlopen   # 内置的模块
    # urlopen html时有个格式的 reguests 无格式
    # def get_url(url):
    #     response = urlopen(url)
    #     content = response.read().decode('utf-8')
    #     return len(content)
    #
    # g1 = gevent.spawn(get_url,'http://www.baidu.com')
    # g2 = gevent.spawn(get_url,'http://www.sogou.com')
    # g3 = gevent.spawn(get_url,'http://www.taobao.com')
    # g4 = gevent.spawn(get_url,'http://www.hao123.com')
    # g5 = gevent.spawn(get_url,'http://www.cnblogs.com')
    # gevent.joinall([g1,g2,g3,g4,g5])
    # print(g1.value)
    # print(g2.value)
    # print(g3.value)
    # print(g4.value)
    # print(g5.value)

    # ret = get_url('http://www.baidu.com')
    # print(ret)
    # ======================================
    from gevent import monkey;monkey.patch_all()
    import socket
    import gevent
    def talk(conn):
       conn.send(b'hello')
       print(conn.recv(1024).decode('utf-8'))
       conn.close()

    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:
       conn,addr = sk.accept()
       gevent.spawn(talk,conn)
    sk.close()

    import socket
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    print(sk.recv(1024))
    msg = input('>>>').encode('utf-8')
    sk.send(msg)
    sk.close()

    7_io模型

    阻塞模型

     

    非阻塞模型

     

    io多路复用

     

     

     

     

     

    # 同步 提交一个任务之后要等待这个任务执行完毕
    # 异步 只管提交任务,不等待这个任务执行完毕就可以做其他事情
    # 阻塞 recv recvfrom accept
    # 非阻塞
    
    # 阻塞   线程   运行状态 --> 阻塞状态 --> 就绪
    # 非阻塞
    
    # IO多路复用
        # select机制  Windows  linux  都是操作系统轮询每一个被监听的项,看是否有读操作
        # poll机制    linux          它可以监听的对象比select机制可以监听的数量多
                                     # 随着监听项的增多,导致效率降低
        # epoll机制   linux           更高级,绑定回调函数,
    # =================================
    # 以前的都是阻塞io
    # =================================
    # 非阻塞io实例
    # import socket
    # sk = socket.socket()
    # sk.bind(('127.0.0.1',9000))
    # sk.setblocking(False)  # 设置不阻塞
    # sk.listen()
    # conn_l = []
    # del_conn = []
    # while True:
    #     try:
    #         conn,addr = sk.accept()  #不阻塞,但是没人连我会报错
    #         print('建立连接了:',addr)
    #         conn_l.append(conn)
    #     except BlockingIOError:
    #         for con in conn_l:
    #             try:
    #                 msg = con.recv(1024)  # 非阻塞,如果没有数据就报错
    #                 if msg == b'':   # 若客户端关闭 会发送空消息
    #                     del_conn.append(con)
    #                     continue
    #                 print(msg)
    #                 con.send(b'byebye')
    #             except BlockingIOError:pass
    #         for con in del_conn:
    #             con.close()
    #             conn_l.remove(con)
    #         del_conn.clear()
    # # while True : 10000   500  501
    #
    # import time
    # import socket
    # import threading
    # def func():
    #     sk = socket.socket()
    #     sk.connect(('127.0.0.1',9000))
    #     sk.send(b'hello')
    #     time.sleep(1)
    #     print(sk.recv(1024))
    #     sk.close()
    #
    # for i in range(2):
    #     threading.Thread(target=func).start()
    # =================================
    # io 多路复用, 监听列表的循环 变为有操作系统执行
    import select
    import socket
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',8000))
    sk.setblocking(False)
    sk.listen()
    
    read_lst = [sk] # 监听列表
    while True:   # [sk,conn]
        # 等待读列表,写列表,修改列表 都必传
        # 返回元祖中3个列表,对应三个list,一般只用第一个
        # r_lst里面就是sk对象
        r_lst,w_lst,x_lst = select.select(read_lst,[],[])
        for i in r_lst:
            if i is sk:
                conn,addr = i.accept()
                read_lst.append(conn)
            else:
                ret = i.recv(1024)
                if ret == b'':
                    i.close()
                    read_lst.remove(i)
                    continue
                print(ret)
                i.send(b'goodbye!')
                import time
                import socket
                import threading
    
    
                def func():
                    sk = socket.socket()
                    sk.connect(('127.0.0.1', 8000))
                    sk.send(b'hello')
                    time.sleep(3)
                    print(sk.recv(1024))
                    sk.close()
                for i in range(20):
                    threading.Thread(target=func).start()
    
    # =================================
    import selectors # 选择合适的多路复用机制
    from socket import *
    
    def accept(sk,mask):
        conn,addr=sk.accept()
        sel.register(conn,selectors.EVENT_READ,read)
    
    def read(conn,mask):
        try:
            data=conn.recv(1024)
            if not data:
                print('closing',conn)
                sel.unregister(conn)
                conn.close()
                return
            conn.send(data.upper()+b'_SB')
        except Exception:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
    
    sk=socket()
    sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    sk.bind(('127.0.0.1',8088))
    sk.listen(5)
    sk.setblocking(False) #设置socket的接口为非阻塞
    
    sel = selectors.DefaultSelector()   # 自动选择一个适合我的IO多路复用的机制
    sel.register(sk,selectors.EVENT_READ,accept)
    #相当于网select的读列表里append了一个sk对象,并且绑定了一个回调函数accept
    # 说白了就是 如果有人请求连接sk,就调用accrpt方法
    
    while True:
        events=sel.select() #检测所有的sk,conn,是否有完成wait data阶段
        for sel_obj,mask in events:  # [conn]
            callback=sel_obj.data #callback=read
            callback(sel_obj.fileobj,mask) #read(conn,1)
    

    pymysql

    import pymysql
    # 连接
    conn = pymysql.connect(
        host="106.15.39.74",
        port=3306,
        database="test",
        user="root",
        password="dzf123,.",
        charset="utf8" # 没有"-" 没有
    )
    cursor = conn.cursor()
    sql = "select*from student"
    name = "dzf"
    password = "123456"
    sql = "select * from student where name = %s and password = %s"
    ret = cursor.execute(sql,[name,password])
    # 自己拼接需要加引号,使用防注入sql不用加引号,参数不能少,多
    #print(cursor.lastrowid) # 获取刚插入数据的id 应该就是主键 自增的那个,与名字无关
    print(ret) # 返回受影响行数
    ret = cursor.fetchall() # 元祖 大元组里边小元祖
    print(ret,"a")
    ret = cursor.fetchone() # 取一条数据
    print(ret,"a")
    ret = cursor.fetchone()
    print(ret,"a")
    # 直接返回一条元素,格式是 小元祖,或只有list中的一个小字典,外边没有元祖或list
    # 若连续fetchone() 第一次第一条,第二次第二条,一次向下取
    # 若取完后 再次 fetchone() 取不到
    # -->(('dzf','1234'),('dzf','1234'))
    # 在执行语句前 修改cursor格式
    cursor.fetchmany(3) # 在cursor位置接下取3条,大元组中小元祖
    # 移动光标
    cursor.scroll(1,mode="absolute") # 绝对移动 移到1位置,从2开始 ,
    cursor.scroll(1,mode="relative") # 相对移动 原来在3 位置,从4 开始读,现在 移动到4 从5开始读
    # 向上移可以使用负的
    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 指定为字典格式
    """
        [
        {'id':1,'name':'dzf'},
        {'id':1,'name':'dzf'},
        ]
    """
    
    cursor.close()
    conn.close()
    # ====================================
    # 插入数据还是用cursor.execute(),注意提交后conn.commit()
    # 若多语句,可能错误,conn.rollback()
    # sql2 = "insert into student (name,password) values(%s, %s)"
    # ret = cursor.execute(sql2,['123','123'])
    # conn.commit()
    # 或insert into student (name,password) values(%(name)s, %(pwd)s)
    # 下边传入字典excute(sql,{"name":xxx..})
    # ====================================
    # 批量执行
    data = (['12','12'],['23','32'],['32','23']) # 格式必须固定
    cursor.executemany(sql,data)  # 内部的for循环
    # try 防止异常,要回滚, 会取消以前正确的插入语句
    # =================================
    # 删除,同理,也要提交
    # ================================
    # 修改 记得提交
    
  • 相关阅读:
    Android studio快捷键大全 和 eclipse对照(原)
    .net 提取注释生成API文档 帮助文档
    查看443端口被占用无法启动解决办法
    关于正则表达式 C#
    关于 ImageLoader 说的够细了。。。
    什么时候用Application的Context,什么时候用Activity的Context
    关于layoutparam 请铭记。。。。
    java 静态方法上的泛型
    让多个Fragment 切换时不重新实例化
    开源.net 混淆器ConfuserEx介绍
  • 原文地址:https://www.cnblogs.com/Dean0731/p/11661181.html
Copyright © 2011-2022 走看看