zoukankan      html  css  js  c++  java
  • Python管道进行数据的吞吐处理

    import multiprocessing
    import random
    import time
    import datetime
    import struct
    import os
    
    import getFile
    
    # 76(28) + (2048 + 16) * 512 + 4
    
    frame_flag_0 = 0x0000000000000001 # 8 byte
    frame_flag_1 = 0x0000000000000002 # 8 byte
    frame_flag_2 = 0x0000000000000003 # 8 byte
    frame_flag_9 = 0x00000001 # 4 byte
    
    dir_name = 'Z:/'
    file_out = 'Z:/312.dat'
    
    def producer(pipe):
        print('start time: ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        linestr = getFile.getFileList(dir_name, '_312_')
        
        for i in range(0, len(linestr)):
            file_in = linestr[i]
            f_in = open(file_in, 'rb')
            f_in.seek(109)
            while True:
                buff = f_in.read(9)
                buff = f_in.read(1792)
                if buff:
                    pipe.send_bytes(buff)
                else:
                    f_in.close()
                    break
                    
        pipe.close()
        
        
    def consumer(pipe):
        f_out = open(file_out, 'xb')
        mm = 0
        while True:
            try:
            
                item = pipe.recv_bytes(1792)
                if item:
                    if mm % 512 == 0: # 开始的首帧
                        f_out.write(struct.pack('>3Q', frame_flag_0, frame_flag_1, frame_flag_2))
                        f_out.write(struct.pack('>I', frame_flag_9))
                        f_out.write(item)
                        f_out.write(struct.pack('>I', 0x00000000) * 68) # 272 byte : 2048 - (896*2) + 16
                        mm = 0
                    elif mm % 512 == 511: # 结尾的结束帧
                        f_out.write(item)
                        f_out.write(struct.pack('>I', 0x00000000) * 68) # 272 byte
                        f_out.write(struct.pack('>I', 0x00000000)) # 4 byte 包尾
                        f_out.flush()
                    else:
                        f_out.write(item)
                        f_out.write(struct.pack('>I', 0x00000000) * 68) # 272 byte
                    mm += 1
                    
            except EOFError:
                f_out.write(struct.pack('>I', 0x00000000) * 516 * (512 - mm)) # (2048 + 16)/4 = 516
                f_out.write(struct.pack('>I', 0x00000000)) # 4 byte 包尾
                f_out.close()
                print('end time: ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                break
                
                
    if __name__ == "__main__":
        pipe = multiprocessing.Pipe()
        process_producer = multiprocessing.Process(target = producer, args = (pipe[0],))
        process_consumer = multiprocessing.Process(target = consumer, args = (pipe[1],))
        process_producer.start()
        process_consumer.start()
        pipe[0].close()
        process_producer.join()
        process_consumer.join()
        
        print('OK')
  • 相关阅读:
    foreach next 操作数组指针移动问题,多个数连加,连除,连减,连乘php版本
    mysql 5.7 laravel json类型数据相关操作
    rbac权限控制,基于无线分类
    基于bootstrap-treeview做的一个漂亮的无限分类树层级联动菜单
    css页面字体替换源代码和页面显示不一样问题解决
    centos6.8 编译安装lnmp php7.2 mysql5.6 nginx1.1.4
    mysql5.7采坑
    laravel整合vue 多入口解决
    使用mysql设计一个全局订单生产计数器
    laravel整合workerman做聊天室
  • 原文地址:https://www.cnblogs.com/mikew/p/12111032.html
Copyright © 2011-2022 走看看