zoukankan      html  css  js  c++  java
  • RabbitMQ应用示例

    更多详情参考官方文档:https://www.rabbitmq.com/tutorials/tutorial-six-python.html

    参考博客:https://blog.csdn.net/weixin_41896508/article/details/80997828

     微服务通信RPC

    01-HelloWorld(简单的消息队列)

      send.py  

    import pika
    #与RabbitMQ服务器建立连接
    credential = pika.PlainCredentials('yang','abc123456')#erase_on_connect是否清楚凭证
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',credentials=credential))
    #创建频道
    channel = connection.channel()#channel_number参数指定频道编号,建议默认pika自行管理
    #创建频道传递消息的队列
    channel.queue_declare(queue='hello')
    
    
    
    #向频道队列中发送消息
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='hello world!')
    
    print('[x]消息发送成功!')
    connection.close()#断开连接
    send.py

      receive.py

    import pika
    credentials = pika.PlainCredentials('yang','abc123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',port=5672,credentials=credentials))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(f'{body.decode()}')
    
    channel.basic_consume(queue='hello',
                          auto_ack=True,#开启自动确认关闭手动确认
                          on_message_callback=callback)
    
    print(' [*] Waiting for messages. ')
    channel.start_consuming()
    recieve.py

    02-WorkQueues(任务队列)

      new_task.py  

    '''
    任务生产者将任务发送到指定队列中,多个工作者进行均分协同处理(启多个工作者)
    '''
    import pika
    credentials = pika.PlainCredentials('yang','abc123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,credentials=credentials))
    channel = connection.channel()
    
    channel.queue_declare(queue='work_queue',durable=True)#durable=True指定消息持久化,出现异常不会丢失。注意basic_publish需要设置参数
    
    for i in range(1000):
        message = f'new_task{i}...'
        channel.basic_publish(
            exchange='',
            routing_key='work_queue',
            body=message,
            properties = pika.BasicProperties(delivery_mode=2, )# 支持数据持久化:2代表消息是持久
        )
        print(f'Send>>>{message}')
    
    connection.close()
    new_task.py

      worker.py

    import time
    import random
    import pika
    credentials = pika.PlainCredentials('yang','abc123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,credentials=credentials))
    channel = connection.channel()
    
    channel.queue_declare(queue='work_queue',durable=True)#durable=True指定消息持久化,出现异常不会丢失
    
    def callback(ch, method, properties, body):
        print(f'Rceive>>{body}')
        time.sleep(random.random())
        print(f'Done--{body.decode()}')
        # 手动确认机制(在消费者挂掉没有给确认时,消息不会丢失)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)#此设置确保在没有进行确认之前当前接受得任务只有1个(不设置默认是平均分配)
    channel.basic_consume(
        queue='work_queue',
        on_message_callback=callback
    )
    
    channel.start_consuming()
    worker.py

    03-PublishSubcribe(订阅发布)

      publish.py 

    '''
    发布者发布一条任务,通过交换机发送到与此交换机简历连接的所有队列中进行共享(启多个订阅者)
    '''
    import pika
    credentials = pika.PlainCredentials('yang','abc123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials))
    
    channel = connection.channel()
    
    #创建频道的交换器(交换器负责将任务发送到连接词交换器的所有的队列中)
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout',#创建一个fanout(广播)类型的交换机exchange,名字为logs
                             durable=True)#durabl持久化
    
    for i in range(1000):
        message = f'new_task{i}...'
        channel.basic_publish(
            exchange='logs',#指定交换机
            routing_key='',#无需指定路由键队列,由交换机进行发送
            body=message,
            properties = pika.BasicProperties(delivery_mode=2, )# delivery_mode支持数据持久化:2代表消息是持久
        )
        print(f'Send>>>{message}')
    
    connection.close()
    publish.py

      subcribe.py

    import pika
    credentials = pika.PlainCredentials('yang','abc123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials))
    
    channel = connection.channel()
    
    #与交换器建立连接
    channel.exchange_declare(exchange='logs',exchange_type='fanout',durable=True)#durable持久化
    
    #确定消息队列(不指定队列名,每个订阅者rabbirmq服务器连接都会创建一个随机队列)
    result= channel.queue_declare(queue='',exclusive=True,durable=True)#exclusive=True当断开连接时,队列销毁(持久化没有用,设置了断开销毁)
    queue_name= result.method.queue
    #与交换机新建的的随机队列进行绑定
    channel.queue_bind(exchange='logs',queue=queue_name)
    
    def callback(ch, method, properties, body):
        print(f'Rceive>>{body}')
        print(f'Done--{body.decode()}')
        # 手动确认机制(在消费者挂掉没有给确认时,消息不会丢失)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(
        queue=queue_name,
        on_message_callback=callback
    )
    
    channel.start_consuming()
    subcribe.py

    06-RPC(远程过程调用)

      rpc_client.py

    import pika
    import uuid
    
    
    class RpcClient(object):
        def __init__(self):
            credentials = pika.PlainCredentials('yang', 'abc123456')
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, credentials=credentials))
    
            self.channel = self.connection.channel()
    
            # 随机创建一个唯一队列
            result = self.channel.queue_declare(queue='', exclusive=True)
            self.callback_queue = result.method.queue
    
            # 接收回调队列中的消息
            self.channel.basic_consume(
                queue=self.callback_queue,
                on_message_callback=self.on_response,
                auto_ack=True)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, m, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            # 想通信队列放入消息
            self.channel.basic_publish(
                exchange='',
                routing_key='rpc_queue',
                properties=pika.BasicProperties(
                    delivery_mode=2,#持久化参数
                    # content_type='application/json',#指定body数据类型,不指定则为字符串(有待测试)
                    reply_to=self.callback_queue,  # 指定回调队列
                    correlation_id=self.corr_id,  # 指定本次请求标识id
                ),
                body=str(m) + ',' + str(n)
            )
    
    
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    
    
    clinet_rpc = RpcClient()
    
    print(" [x] Requesting add(30,30)")
    response = clinet_rpc.call(30,20)
    print(" [.] Got %r" % response)
    rpc_client.py

      rpc_server.py

    import pika
    
    credentials = pika.PlainCredentials('yang', 'abc123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, credentials=credentials))
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue', durable=True)  # durable=True消息持久化
    
    
    def add(m, n):
        return m + n
    
    def on_request(ch, method, props, body):
        m, n = body.decode().split(',')
        response = add(int(m), int(n))
    
        #将响应信息放入指定的回调队列中
        channel.basic_publish(
            exchange='',
            routing_key=props.reply_to,#请求指定的响应队列
            properties=pika.BasicProperties(correlation_id=props.correlation_id,),#请求设定的标识(无需设置持久化,每次响应之后回调队列销毁)
            body=str(response)
        )
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动回复确认
    
    
    channel.basic_qos(prefetch_count=1)  # 在没有确认前只分配一个任务
    channel.basic_consume(
        queue='rpc_queue',
        on_message_callback=on_request,
    )
    
    print('Awaiting RPC requests...')
    channel.start_consuming()
    rpc_server.py  
  • 相关阅读:
    如何利用Typora编写博客,快速发布到多平台?
    bios怎么开启ahci模式
    IDENTITY_INSERT 设置
    【热门测试技术,建议收藏备用】项目实战、简历、笔试题、面试题、职业规划
    k8s节点简介、移除节点、新增节点
    正确理解jmeter线程组之RampUp
    Ingress资源规范
    使用dockercompose.yml安装rabbitmq集群
    使用sonarqube对java项目进行分析
    SonarQube支持Gitlab授权登录
  • 原文地址:https://www.cnblogs.com/open-yang/p/11570009.html
Copyright © 2011-2022 走看看