zoukankan      html  css  js  c++  java
  • Django自定义指令+mq消息队列的使用

    import pika
    import json
    import logging
    import base64
    
    from rest_framework.exceptions import ParseError
    
    from django.core.management.base import BaseCommand
    
    from device.access_device import parse_access_device_image
    
    logger = logging.getLogger('server.default')
    
    
    class Command(BaseCommand):
        help = 'Get the value from the message queue and write to queue'
    
        def handle(self, *args, **options):
            obj = RabbitQueue()
            queue_name = '_image'
            obj.pop_queue(queue_name=queue_name)
    
    
    class RabbitQueue:
    
        def __init__(self):
            self.username = 'admin'
            self.password = 'admin'
            self.host = '127.0.0.1'
            self.port = 5672
            self.channel = None
    
        def connect(self):
            credit = pika.PlainCredentials(username=self.username, password=self.password)
            self.channel = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, credentials=credit)).channel()
    
        @staticmethod
        def callback(channel, method, properties, body):
            receive = json.loads(body.decode())
            try:
                device_name = receive['device']['detail']
                device_address = receive['device']['scene']
                image = base64.b64decode(receive['image'])
                username = receive['user']['name']
                phone = receive['user']['username']
                gender = '' if receive['user']['gender'] == 1 else ''
                parse_access_device_image(device_name=device_name, device_address=device_address, image=image, username=username, phone=phone, gender=gender)
            except ParseError:
                logger.error(receive)
                raise ParseError('ParseError...')
    
        def pop_queue(self, queue_name, timeout=0):
            """从消息队列获取数据"""
            self.connect()
            channel = self.channel
    
            channel.queue_declare(queue=queue_name, durable=True)
            channel.basic_consume(on_message_callback=self.callback, queue=queue_name, auto_ack=False)
    
            channel.start_consuming()
  • 相关阅读:
    python库--pandas--文本文件读取
    python库--flashtext--大规模数据清洗利器
    PyCharm--帮助文档
    Git--命令
    symfony doctrine generate entity repository
    [转]MySQL性能优化的最佳20+条经验
    svn使用
    一致性hash
    JavaScript学习笔记 1
    curl发出请求
  • 原文地址:https://www.cnblogs.com/52-qq/p/11596751.html
Copyright © 2011-2022 走看看