zoukankan      html  css  js  c++  java
  • python 消息队列-rabbitMQ 和 redis介绍使用

    1、rabbitMQ 与ptyhon 进程queue 区别。进程queue 主要用户Python父子进程之间或者统一进程不同子进程。rabbit可以用户不同语言之前的相互交流,socket可以实现同样功能,但是较为复杂。

    2、 rabbitMQ  消息轮训。一个生产者对多个消费者时候。会自动将消息轮训给不同消费者。

    # Author : xiajinqi
    
    import pika
    
    connetction = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    
    channel = connetction.channel()
    
    channel.queue_declare(queue='hello')
    #  1个生产着,三个消费者,会自动轮训,其中一个消费者宕机后,消息会自动发给其他消费者处理。
    channel.basic_publish(exchange='',routing_key='hello',body='hello world!')
    
    print("消息已经发送")
    
    channel.close()
    
    
    
    # Author : xiajinqi
    import pika
    import time
    
    connetction = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    
    channel = connetction.channel()
    
    channel.queue_declare(queue='hello')  #避免生产者后启动,没有这个队列报错。所以在此申明
    
    def  callback(ch,method,properties,body):
        '''
        :param ch:  管道对象内存地址
        :param method: 发消息给谁的申明信息
        :param properties:
        :param body:
        :return:
        '''
        print(ch,method,properties,body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  #执行完以后告诉服务端
       # time.sleep(30)
        pass
    
    ## 申明收到调用callbak处理  no_ack 默认为false消息不会丢失,表示需要客户端回调函数处理完,主动告诉服务端已经处理完。为true断电消息会丢失
    #channel.basic_consume(callback,queue='hello',no_ack='True')
    channel.basic_consume(callback,queue='hello')
    
    print("开始收消息")
    
    channel.start_consuming()

    3、服务端消息持久化声明

    channel.queue_declare(queue='hello1',durable='True') # durable队列持久化申明
    #  1个生产着,三个消费者,会自动轮训,其中一个消费者宕机后,消息会自动发给其他消费者处理。
    #delivery_mode 消息持久化声明
    channel.basic_publish(exchange='',routing_key='hello1',body='hello world!',properties=pika.BasicProperties(delivery_mode=2))

     4、消息轮训,机器性能不同时候,可能要求每个机器权重不一样。实现机制如下。prefetch_count =1 只要有消息没有处理完。自动转化给其他人

    # Author : xiajinqi
    import pika
    import time
    
    connetction = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    
    channel = connetction.channel()
    
    channel.queue_declare(queue='hello2',durable="True")  #避免生产者后启动,没有这个队列报错。所以在此申明
    
    def  callback(ch,method,properties,body):
        '''
        :param ch:  管道对象内存地址
        :param method: 发消息给谁的申明信息
        :param properties:
        :param body:
        :return:
        '''
        print(ch,method,properties,body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  #执行完以后告诉服务端
       # time.sleep(30)
        pass
    
    
    #channel.basic_consume(callback,queue='hello',no_ack='True')
    channel.basic_consume(callback,queue='hello2')
    
    channel.basic_qos(prefetch_count=1) 
    print("开始收消息")
    
    channel.start_consuming()
    

    5、exchange 中的fanout模式,广播模式。服务端生成一个交换器exchange(广播台开始广播)。客户端每次随机生成一个队列(收音机)。然后绑定exchange。

    广播模式中。会发给所有绑定exchange的队列。消息不会缓存。类似于广播。消息是实时的。即广播时候。用户不在消息不缓存。用户不在线。消息依旧发送

    # Author : xiajinqi
    
    import pika
    
    connetction = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    
    channel = connetction.channel()
    
    #channel.queue_declare(queue='hello1',durable='True') #
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    
    channel.basic_publish(exchange='logs',routing_key='',body='hello world!',properties=pika.BasicProperties(delivery_mode=2))
    
    print("消息已经发送")
    
    channel.close()
    #E:RB
    abbitmq_server-3.7.7sbin
    abbitmqctl.bat list_queues
    
    
    
    # Author : xiajinqi
    import pika
    import time
    
    connetction = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    
    channel = connetction.channel()
    
    #channel.queue_declare(queue='hello2',durable="True")  #避免生产者后启动,没有这个队列报错。所以在此申明
    
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    
    result = channel.queue_declare(exclusive='True') #客户端每次启动会随机生成一个队列
    queue_name = result.method.queue  #  获取随机生成的队列名
    
    
    # exchange 绑定 queue
    channel.queue_bind(exchange='logs',queue=queue_name)
    
    def  callback(ch,method,properties,body):
        '''
        :param ch:  管道对象内存地址
        :param method: 发消息给谁的申明信息
        :param properties:
        :param body:
        :return:
        '''
        print(ch,method,properties,body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  #执行完以后告诉服务端
       # time.sleep(30)
        pass
    
    ## 申明收到调用callbak处理  no_ack 默认为false消息不会丢失,表示需要客户端回调函数处理完,主动告诉服务端已经处理完。为true断电消息会丢失
    #channel.basic_consume(callback,queue='hello',no_ack='True')
    channel.basic_consume(callback,queue=queue_name)
    
    channel.basic_qos(prefetch_count=1)
    print("开始收消息")
    
    channel.start_consuming()

    6、direct 模式。

    # Author : xiajinqi
    
    import pika
    import sys
    connetction = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    
    channel = connetction.channel()
    
    #channel.queue_declare(queue='hello1',durable='True') #
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    
    serverity =  sys.argv[1]  if len(sys.argv)> 1 else 'info'
    message = ''.join(sys.argv[2:]) or 'hello world'
    
    channel.basic_publish(exchange='direct_logs',routing_key=serverity,body=message,properties=pika.BasicProperties(delivery_mode=2))
    
    print("消息已经发送")
    
    channel.close()
    
    
    # Author : xiajinqi
    import pika
    import time
    import sys
    connetction = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    
    channel = connetction.channel()
    
    #channel.queue_declare(queue='hello2',durable="True")  #避免生产者后启动,没有这个队列报错。所以在此申明
    
    
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    
    result = channel.queue_declare(exclusive='True') #客户端每次启动会随机生成一个队列
    
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    
    serverity =  sys.argv[1:]
    
    if not serverity:
        print("退出")
        sys.exit()
    
    
    queue_name = result.method.queue  #  获取随机生成的队列名
    
    
    
    for st in serverity:
        channel.queue_bind(exchange='direct_logs', queue=queue_name,routing_key=st)
    
    
    
    
    
    
    def  callback(ch,method,properties,body):
        '''
        :param ch:  管道对象内存地址
        :param method: 发消息给谁的申明信息
        :param properties:
        :param body:
        :return:
        '''
        print(ch,method,properties,body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  #执行完以后告诉服务端
       # time.sleep(30)
        pass
    
    ## 申明收到调用callbak处理  no_ack 默认为false消息不会丢失,表示需要客户端回调函数处理完,主动告诉服务端已经处理完。为true断电消息会丢失
    #channel.basic_consume(callback,queue='hello',no_ack='True')
    channel.basic_consume(callback,queue=queue_name)
    
    channel.basic_qos(prefetch_count=1)
    print("开始收消息")
    
    channel.start_consuming()

     7、topic 模式 和 direct 可以动态匹配

    # Author : xiajinqi
    
    import pika
    import sys
    connetction = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    
    channel = connetction.channel()
    
    #channel.queue_declare(queue='hello1',durable='True') #
    channel.exchange_declare(exchange='direct_logs',exchange_type='topic')
    
    serverity =  sys.argv[1]  if len(sys.argv)> 1 else 'info'
    message = ''.join(sys.argv[2:]) or 'hello world'
    
    channel.basic_publish(exchange='direct_logs',routing_key=serverity,body=message,properties=pika.BasicProperties(delivery_mode=2))
    
    print("消息已经发送")
    
    channel.close()

    8、

      

  • 相关阅读:
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法训练 排列问题
    Java实现 蓝桥杯VIP 算法训练 排列问题
    Java实现 蓝桥杯VIP 算法训练 排列问题
    Java实现 蓝桥杯VIP 算法训练 排列问题
    关于模态/非模态对话框不响应菜单的UPDATE_COMMAND_UI消息(对对WM_INITMENUPOPUP消息的处理)
  • 原文地址:https://www.cnblogs.com/xiajq/p/9536421.html
Copyright © 2011-2022 走看看