zoukankan      html  css  js  c++  java
  • python中的线程技术

    #!/user/bin/env python
    # @Time     :2018/7/7 11:42
    # @Author   :PGIDYSQ
    #@File      :DaemonTest.py
    import threading,time
    # 1.线程的简单使用
    # class MyThread(threading.Thread):
    #     def __init__(self,num,threadname):
    #         threading.Thread.__init__(self,name=threadname)
    #         self.num = num
    #     def run(self):
    #         time.sleep(self.num)
    #         print(self.num)
    #
    # t1 = MyThread(1,'t1')
    # t2 = MyThread(5,'t2')
    #
    # t2.daemon = True
    # print(t1.daemon)
    # print(t2.daemon)
    #
    # t1.start()
    # t2.start()
    
    #2.多线程使用,线程同步技术
    
    # class MyThread2(threading.Thread):
    #     def __init__(self):
    #         threading.Thread.__init__(self)
    #     #重写run方法
    #     def run(self):
    #         global x
    #         #获取锁,
    #         lock.acquire()#相当于同步语块
    #         x = x+3
    #         print(x)
    #         #释放锁
    #         lock.release()
    #
    # lock = threading.RLock()
    # #存放多个线程列表
    # t12 = []
    # #创建线程
    # for i in range(10):
    #     #创建线程并添加到列表中
    #     t = MyThread2()
    #     t12.append(t)
    # #多个线程互斥访问的变量
    # x = 0
    # #启动所有线程
    # for i in t12:
    #     i.start()
    #3.使用Condition实现生产者与消费者关系
    from random  import randint
    from time import sleep
    #自定义生产者线程类
    class Producer(threading.Thread):
        def __init__(self,threadname):
            threading.Thread.__init__(self,name = threadname)
        def run(self):
            global x
            while True:
                con.acquire()
                if(len(x) == 10):
                    con.wait()
                    print('生产者等待中...')
                else:
                    print('生产者:',end=' ')
                    x.append(randint(1,1000))
                    print('生产的数据:%s'%x)
                    sleep(1)
                    #唤醒等待条件的线程
                    con.notify()
                #释放锁
                con.release()
    #自定义消费者线程类
    class Consumer(threading.Thread):
        def __init__(self,threadname):
            threading.Thread.__init__(self,name = threadname)
        def run(self):
            global x
            while True:
                #获取锁
                con.acquire()
                if not x:
                    con.wait()
                    print("消费者等待中...")
                else:
                    print('消费的数据:%s'%x.pop(0))
                    print('消费后剩余的数据:%s'%x)
                    sleep(2)
                    con.notify()
                con.release()
    
    #创建Condition对象并测试
    con = threading.Condition()
    x = []
    p = Producer("Producer")
    c = Consumer("Consumer")
    
    # p.start()
    # c.start()
    # p.join()
    # c.join()
    #4.还有Queue对象,Event对象使用...
    
    #5.进程的使用
    # from multiprocessing import Process
    # import os
    # from statistics import mean
    #
    # def f(name):
    #     print(os.getppid())
    #     print(name)
    # if __name__ == '__main__':
    #     pro1 = Process(target=f,args=('yao',))
    #     pro1.start()
    #     pro1.join()
    #6.计算二维数组每行的平均值
    # x =[list(range(10)),list(range(20,30)),list(range(40,50)),list(range(70,90))]
    # from multiprocessing import Pool
    # from statistics import mean
    # def f(x):
    #     return mean(x)#求平均值
    # if __name__ == '__main__':
    #     with Pool(5) as p:
    #         print(p.map(f,x))
    
    #7.使用管道实现进程间数据交换
    # from multiprocessing import Process,Pipe
    # def f(conn):
    #     conn.send('hello world')
    #     conn.close()
    # if __name__ == "__main__":
    #     parent_conn,child_conn = Pipe()#创建管道对象,并设置接收端与发送端
    #     p = Process(target=f,args=(parent_conn,))#将管道的一方做为参数传递给子进程
    #     p.start()
    #     p.join()
    #     print(child_conn.recv())#接收管道发送方的信息
    #     child_conn.close()
    #8.使用共享内存实现进程间数据的交换
    # from multiprocessing import Process,Value,Array
    # def f(n,a):
    #     n.value = 3.1415927
    #     for i in range(len(a)):
    #         a[i] = a[i] * a[i]
    # if __name__ == "__main__":
    #     num = Value('d',0.0)        #实型
    #     arr = Array('i',range(10))  #整型
    #     p = Process(target=f,args=(num,arr))    #创建进程对象
    #     p.start()
    #     p.join()
    #     print(num.value)
    #     print(arr[:])
    #9.使用Manager对象实现进程间数据交互,Manager控制拥有list,dict,Lock,RLock,Semaphore,Condition,Event,Queue,Value等对象的服务端进程
    # from multiprocessing import Manager,Process
    # def f(d,l,t):
    #     d['name'] = 'Yaos'
    #     d['age'] = 34
    #     l.reverse()
    #     t.value=23
    # if __name__ == "__main__":
    #     with Manager() as manager:
    #         d = manager.dict()
    #         l = manager.list(range(10))
    #         t = manager.Value('i',0)
    #         p = Process(target=f,args=(d,l,t))
    #         p.start()
    #         p.join()
    #         for item in d.items():
    #             print(item)
    #         print(l)
    #         print(t.value)
    #10.进程同步技术与现场同步技术类似,可以使用Lock,Event对象
    # from multiprocessing import Process,Event
    # def f(e,i):
    #     if e.is_set():
    #         e.wait()
    #         print('hello world',i)
    #         e.clear()
    #     else:
    #         e.set()
    # if __name__ == "__main__":
    #     e = Event()
    #     for num in range(6):
    #         Process(target=f,args=(e,num)).start()
    #11.MapReduce框架的使用---大数据处理
    #一种编程模型,用于大规模数据集(大于1TB)的并行计算
    # import os,os.path,time
    # '''大文件分割'''
    # def FileSplit(sourceFile,targetFile):
    #     if not os.path.isfile(sourceFile):
    #         print(sourceFile,' does not File!')
    #         return
    #     if not os.path.isdir(targetFile):
    #         os.mkdir(targetFile)
    #     tempData = []
    #     number = 1000#切分后的每一个小文件包含number行数据
    #     fileNum = 1
    #     with open(sourceFile,'r',encoding='UTF-8') as srcFile:
    #         dataLine = srcFile.readline().strip()
    #         while dataLine:
    #             for i in range(number):
    #                 tempData.append(dataLine)
    #                 dataLine = srcFile.readline()
    #                 if not dataLine:
    #                     break
    #             desFile = os.path.join(targetFile,sourceFile[0:-4] + str(fileNum) + ".txt")
    #             with open(desFile,'a+',encoding='UTF-8') as f:
    #                 f.writelines(tempData)
    #             tempData = []
    #             fileNum = fileNum + 1
    #
    # if __name__ == "__main__":
    #     FileSplit(sourceFile="test.txt",targetFile="test/SourceFileSplit")
    
    '''12.使用Map处理数据'''
    # import os,re,threading,time
    # def Map(sourceFile):
    #     if not os.path.exists(sourceFile):
    #         print(sourceFile,'does not exist')
    #         return
    #     pattern = re.compile(r'[0-9]{4}/[0-9]{1,2}/[0-9]{1,2}')
    #     result = {}
    #     with open(sourceFile,'r',encoding='UTF-8') as srcFile:
    #         for dataLine in srcFile:
    #             r = pattern.findall(dataLine)#查找符合日期格式的字符串
    #             if r:
    #                 # result.get(r[0],0):get()方法用于返回指定键的对应值,当键不存在时,返回指定特定的值;与setdefault()一样,当键不存在时,新增指定的值
    #                 result[r[0]] = result.get(r[0],0) + 1
    #     desFile = sourceFile.replace('SourceFileSplit','MapFile')[0:-4] + '_map.txt'
    #     with open(desFile,'a+',encoding='UTF-8') as fp:
    #         for k,v in result.items():
    #             fp.write(k+':'+str(v)+'
    ')
    # if __name__ == "__main__":
    #     desFolder = 'test/SourceFileSplit'
    #     files = os.listdir(desFolder)
    #     def Main(i):
    #         Map(desFolder+'\'+files[i])
    #     fileNumber = len(files)
    #     for i in range(fileNumber):#多线程处理
    #         t = threading.Thread(target=Main,args=(i,))
    #         t.start()
    '''13.使用Reducer获取最终数据'''
    # from os.path import isdir
    # from os import listdir
    #
    # def Reduce(sourceFolder,targetFile):
    #     if not isdir(sourceFolder):
    #         print(sourceFolder,'does not exist.')
    #         return
    #     result = {}
    #     #仅处理匹配的文件
    #     allFiles = [sourceFolder + '\' + f for f in listdir(sourceFolder) if f.endswith('_map.txt')]
    #     for f in allFiles:
    #         with open(f,'r',encoding='UTF-8') as fp:
    #             for line in fp:
    #                 line = line.strip()
    #                 if not line:
    #                     continue
    #                 key,value = line.split(':')
    #                 result[key] = result.get(key,0) + int(value)
    #     with open(targetFile,'w',encoding='UTF-8') as fp:
    #         for k,v in result.items():
    #             fp.write(k+':'+str(v)+'
    
    ')
    # if __name__ == "__main__":
    #     Reduce('test\MapFile','test\result.txt')
    
    #14.优化上面MapRedurce处理,需要进行分割文件,直接使用Map,Reduce
    # import os,re,time
    # def Map(sourceFile):
    #     if not os.path.exists(sourceFile):
    #         print(sourceFile,' does not exist.')
    #         return
    #     pattern = re.compile(r'[0-9]{4}/[0-9]{1,2}/[0-9]{1,2}')
    #     result = {}
    #     with open(sourceFile,'r',encoding='UTF-8') as srcFile:
    #         for dataLine in srcFile:
    #             r = pattern.findall(dataLine)
    #             if r:
    #                 print(r[0],',',1)
    # Map('test.txt')
    #15.Redurce文件
    # import os,sys
    # def Reduce(targetFile):
    #     result = {}
    #     for line in sys.stdin:
    #         riqi,shuliang = line.strip().split(',')
    #         result[riqi] = result.get(riqi,0)+1
    #     with open(targetFile,'w',encoding='UTF-8') as fp:
    #         for k,v in result.items():
    #             fp.write(k+':'+str(v)+'
    ')
    # Reduce('result.txt')

     本文旨在学习python中有关线程的基础知识,上述代码都是一个个相关的实例,仅供参考。

  • 相关阅读:
    node path.resolve()和path.join()
    完美替代postman的接口测试工具—— apipost
    localforage indexedDB如何使用索引
    ApiPost V5 升级指南
    Chrome升级到91版本以上后Cookies SameSite问题,IdentityServer4登录不上问题?
    React直接调用Bootstrap的方案
    Golang的module模式下项目组织结构
    Linux部署SpringBoot项目jar包,输出日志到文件并追踪
    mybatis plus 查询语句
    springboot 引入AOP 切面 @Aspect 注解使用
  • 原文地址:https://www.cnblogs.com/ysq0908/p/9362539.html
Copyright © 2011-2022 走看看