zoukankan      html  css  js  c++  java
  • 以多进程读取oss符合条件的数据为例,综合使用多进程间的通信、获取多进程的数据

    import datetime
    import sys
    import oss2
    from itertools import islice
    import pandas as pd
    import re
    import json
    from pandas.tseries.offsets import Day
    from multiprocessing import Process, JoinableQueue, cpu_count, Manager
    import time
    
    
    def mkbuck(bk):
    	auth = oss2.Auth(username, password)
    	bucket = oss2.Bucket(auth, address, bk)
    	return bucket
    
    #获取前天最后一小时的paths
    def getbflastpt(bucket, bfyespattern):
    	bfpamax = []
    	for bf in islice(oss2.ObjectIterator(bucket, prefix=bfyespattern), sys.maxsize):
    		c = bf.key
    		if c[-1:] != '/':
    			bfpamax.append(int(c.split('/')[4]))
    	last = pd.Series(bfpamax).unique().max()
    	if last < 10:
    		bflastpt = bfyespattern + '/0' + str(last)
    	else:
    		bflastpt = bfyespattern + '/' + str(last)
    	return bflastpt
    
    #获取当天第一个小时的paths
    def getnowfirstpt(bucket, nowpattern):
    	bfpamin = []
    	for bf in islice(oss2.ObjectIterator(bucket, prefix=nowpattern), sys.maxsize):
    		c = bf.key
    		if c[-1:] != '/':
    			bfpamin.append(int(c.split('/')[4]))
    	first = pd.Series(bfpamin).unique().min()
    	if first < 10:
    		nowfirstpt = nowpattern + '/0' + str(first)
    	else:
    		nowfirstpt = nowpattern + '/' + str(first)
    	return nowfirstpt
    
    #获取所有的昨日paths,并合并得到完全的paths和数量
    def getfullnum(bk, bfyespattern, nowpattern, yespattern):
    	lists = []
    	bucket = mkbuck(bk)
    	bfyespattern = getbflastpt(bucket, bfyespattern)
    	nowpattern = getnowfirstpt(bucket, nowpattern)
    	timelist = (s for s in (bfyespattern, yespattern, nowpattern))
    	for pter in timelist:
    		for bf in islice(oss2.ObjectIterator(bucket, prefix=pter), sys.maxsize):
    			c = bf.key
    			lists.append(c)
    	return lists, len(lists)
    
    #以下为进程间通信,即生产者、消费者模型
    def getfull(bk, bfyespattern, nowpattern, yespattern, q):
    	lists, num = getfullnum(bk, bfyespattern, nowpattern, yespattern)
    	for c in lists:
    		q.put(c)
    	q.join()
    
    
    def consumer(bk, q, d):
    	bucket = mkbuck(bk)
    	repattern2 = re.compile('{.*"adadji",.*}')
    	while True:
    		js = []
    		ress = q.get()
    		if ress[-1:] != '/':
    			remote_data = bucket.get_object(ress).read().decode('utf-8')
    			aa = (d for d in repattern2.findall(remote_data))
    			for a in aa:
    				temdic = json.loads(a)
    				if (starttime <= temdic['created_at']) and (temdic['created_at'] <= endtime):
    					js.append(temdic)
    		df = pd.DataFrame(js, columns=['dd','cc'])
    		d[ress] = df##d为通过主进程Manager共享变量将数据取出
    		# print(ress)
    		q.task_done()# 向q.join()发送一次信号,证明一个数据已经被取走了
    
    
    if __name__ == '__main__':
    	s1 = time.time()
    	now_time = datetime.datetime.now()  # 获取当前时间
    	bfyes_time = (now_time - 2 * Day()).strftime('%Y/%m/%d')
    	yes_time = (now_time - 1 * Day()).strftime('%Y/%m/%d')
    	yesdate = (now_time - 1 * Day()).strftime('%Y-%m-%d')
    	yesdate1 = (now_time - 1 * Day()).strftime('%Y%m%d')
    	endtime = (now_time - 1 * Day()).strftime('%Y-%m-%d 23:59:59')
    	starttime = (now_time - 1 * Day()).strftime('%Y-%m-%d 00:00:00')
    	nowdate = now_time.strftime('%Y/%m/%d')
    	
    	bk = 'xxx'
    	bfyespattern = '%s/%s' % (bk, bfyes_time)
    	yespattern = '%s/%s' % (bk, yes_time)
    	nowpattern = '%s/%s' % (bk, nowdate)
    	
    	q = JoinableQueue(cpu_count())
    	m = Manager()
    	d = m.dict()  ##创建进程间的共享内存字典,方便各个进程处理好的数据
    	p1 = Process(target=getfull, args=('xx', bfyespattern, nowpattern, yespattern, q))
    	#####生成consumer多进程
    	cc = []
    	for c in range(cpu_count() - 1):
    		c1 = Process(target=consumer, args=('xx', q, d))
    		cc.append(c1)
    	
    	p_l = [p1]
    	for c in cc:
    		c.daemon = True
    		p_l.append(c)
    	
    	for p in p_l:
    		p.start()
    	p1.join()
    	d = d.values()
    	df1 = pd.concat(d, ignore_index=True)
    	df1.sort_values('created_at', inplace=True)
    	print(time.time() - s1)
    	print('=' * 20)
    	print(df1)
    

      说明:需求为获取昨日的数据即可,因oss实时数据存储可能存在提前或延迟情况,因此读取前天的最后一小时,昨日全部,当天最开始一小时数据,读者可根据自身情况进行修改

  • 相关阅读:
    SpringBoot+EasyCaptcha实现验证码功能
    Spring boot集成Swagger
    Swagger注释API :@ApiModel
    lombok的@Accessors注解3个属性说明
    lombok——@EqualsAndHashCode(callSuper = true)注解的使用
    Springboot集成分页插件PageHelper
    SprinBoot application.properties配置详情之DataSource
    SpringBoot系列之banner.txt (转)
    C语言基础知识汇总
    Byte、KB、MB、GB、TB、PB、EB是啥以及它们之间的进率
  • 原文地址:https://www.cnblogs.com/mahailuo/p/9293825.html
Copyright © 2011-2022 走看看