zoukankan      html  css  js  c++  java
  • python3 访问 rabbitmq 示例

    关于 rabbitmq

    之前用过 kafka,要是拿这两者做对比的话,大概有以下异同:

    1. 两者都是一个分布式架构
    2. kafka 具有较高的吞吐量,rabbimq 吞吐量较小
    3. rabbitmq 的可靠性更好,确认机制(生产者和 exchange,消费者和队列),支持事务,但会造成阻塞,委托(添加回调来处理发送失败的消息)和备份交换器(将发送失败的消息存下来后面再处理)机制
    4. kafka 常用于日志收集业务,rabbitmq 常用于抢购,支付业务

    rabbitmq demo

    producer

    # coding: utf-8
    import json
    import pika
    
    credentials = pika.PlainCredentials('sm', 'sm')
    connection = pika.BlockingConnection(pika.ConnectionParameters('32.86.5.93', 5672, '/', credentials))
    channel = connection.channel()
    
    # 声明queue,需要注意这里的配置,消费者声明 queue 时需要与生产者保持一致
    channel.queue_declare(queue='viosm', arguments={"x-max-length": 10000})
    
    
    body = json.dumps({"test": "test"})
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    # 推送到交换机,因为队列是一进一出的,如果推送到队列,只能有一个消费者来获取(来自毛绒绒胖虫子)
    ret = channel.basic_publish(exchange="smai",
                                routing_key="",
                                body=body)
    print(ret)  # 返回 None
    connection.close()

    consumer

    # coding: utf-8
    import json
    import pika
    
    credentials = pika.PlainCredentials('sm', 'sm')
    connection = pika.BlockingConnection(pika.ConnectionParameters('32.86.5.93', 5672, '/', credentials))
    channel = connection.channel()
    
    # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    # was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    channel.queue_declare(queue='viosm', arguments={"x-max-length": 10000})  # 这里需要与生产者的声明保持一致
    
    
    def msg_consumer(ch, method, properties, data_bytes):
        data_json = data_bytes.decode()
        data_dict = json.loads(data_json)
        print("{}".format(data_dict))
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动提交偏移量
    
    
    channel.basic_consume('viosm',       # 队列名
                          msg_consumer,  # 回调函数
                          consumer_tag="seemmo_consumer",
                          # auto_ack=True,  # 自动提交偏移量
                          )
    
    channel.start_consuming()
    # forever

    ending ~ 

    每天都要遇到更好的自己.
  • 相关阅读:
    JavaScript function (简单总结)
    JavaScript 数组 (简单总结)
    yum update 和 yum upgrate 区别
    git clone警告,提示Warning:Permission denied (publickey)
    ''退格符号笔记
    MySQL Workbench导出Model提示['ERROR 1064 (42000): You have an error in your SQL syntax....syntax to use near 'VISIBLE']
    《Python编程从入门到实践》--- 学习过程笔记(3)列表
    《Python编程从入门到实践》--- 学习过程笔记(2)变量和简单数据类型
    Windows+MyEclipse+MySQL【连接数据库报错caching_sha2_password】
    测试 | 让每一粒尘埃有的放矢
  • 原文地址:https://www.cnblogs.com/kaichenkai/p/11510369.html
Copyright © 2011-2022 走看看