zoukankan      html  css  js  c++  java
  • RabbitMQ 消息队列

    背景:python的Queue消息队列,只能python自己用。

    --线程threading Queue 只能在一个进程间交互数据

    --进程Queue最多可以父进程和多个子进程进行交互

    常见队列工具:RabbitMQ、ZeroMQ、ActiveMQ

    RabbitMQ 使用erlang语言开发的。

    Python中和RabbitMQ交互的工具有pika、celery、Haigha

     https://www.cnblogs.com/alex3714/articles/5248247.html

    #!/usr/bin/env python
    # Author:Zhangmingda
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel() #声明一个管道
    channel.queue_declare(queue='hello',durable=True) #声明queue,持久化存储消息队列:durable=True
    '''消息持久化:rabbitmq服务器在重启时不会丢失队列消息'''
    channel.basic_publish( #向队列里面推送消息
        exchange='',
        routing_key='hello',
        body='hello World!',
        properties=pika.BasicProperties(delivery_mode=2,) #队列中的消息持久化,
    )
    connection.close() #关闭socket
    pika-make-producer
    #!/usr/bin/env python
    # Author:Zhangmingda
    import pika,time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello',durable=True)
    
    #不知道服务端还是客户端会先启动,接收端也声明队列名
    
    def callback(ch,method,properties,body): #回调函数,在接收到消息后用来处理消息内容:body
        print('ch:',ch)
        print('method',method)
        print('properties',properties)
        print('body',body)
        print('sleep....')
        time.sleep(10)
        ch.basic_ack(delivery_tag=method.delivery_tag) #向rabbitmq发送任务已经执行结束的消息,然后rabbitmq删除队列中这个消息
        print('callback is over!')
    channel.basic_qos(prefetch_count=3) #消息排队数量 根据处理器能力而定
    channel.basic_consume(callback,
                     queue='hello')#)
                     # no_ack=True) #如果设置了no_ack (自动回复确认消息给rabbitmq,那么rabbitmq就不会保留消息队列,不论回调函数是否执行完成。)
                                  #一般都不设置这个东西,就需要人为指定在哪里给rabbitmq设置删除队列消息的指令,保证只要没有收到消费端处理的结果就再rabbitmq中保留这个消息。然后交给下一个消费者继续重新做这个任务
    
    print('一直等消息,么有了就卡住,to exit press CTRL+C')
    channel.start_consuming()
    pika-receive-consumer(从rabbitmq取消息)

    同时向多个客户端发消息(广播)

    PublishSubscribe(消息发布订阅) 

    之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,

    An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

    Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息


    fanout: 所有bind到此exchange的queue都可以接收消息
    direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
    topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

       表达式符号说明:#代表一个或多个字符,*代表任何字符
          例:#.a会匹配a.a,aa.a,aaa.a等
              *.a会匹配a.a,b.a,c.a等
         注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

    headers: 通过headers 来决定把消息发给哪些queue

    fanout: 所有bind到此exchange的queue都可以接收消息如下代码

    #!/usr/bin/env python
    # Author:Zhangmingda
    
    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel() #声明一个管道
    # channel.queue_declare(queue='hello',durable=True) #声明queue,持久化存储消息队列:durable=True
    '''广播发起时无需指定队列名字,统一由exchange分发'''
    
    '''exchange 转发器进行广播,无需声明消息队列名,消息发过之后就没了'''
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
    channel.basic_publish( #向队列里面推送消息
        exchange='logs', #向转发器发消息
        routing_key='', #无队列名,但是要置空
        body=message, #发送的消息内容
        properties=pika.BasicProperties(delivery_mode=2,) #队列中的消息持久化,
    )
    print('send message :',message)
    connection.close() #关闭socket
    发消息端publish广播端
    #!/usr/bin/env python
    # Author:Zhangmingda
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    # channel.queue_declare(queue='hello',durable=True)接收消息不直接通过队列,而是通过下面绑定到exchange上的临时队列来接收
    channel.exchange_declare(exchange='logs',exchange_type='fanout') #声明要从哪个转发器接收广播
    result = channel.queue_declare(exclusive=True)#获取消息队列实例。不指定queue名字,rabbit会随机分配一个名字,exclusive=True指的是使用此queue的消费者断开后,自动删除这个queue
    queue_name = result.method.queue #从消息队列实例取消息队列名字
    channel.queue_bind(exchange='logs',queue=queue_name) #将消息queue名字和转发器exchange绑定
    '''接收广播消息,就像收音机,服务器那边发过之后不会保留消息,发当时没接收,就接收不到了'''
    def callback(ch,method,properties,body): #回调函数,在接收到消息后用来处理消息内容:body
        print('ch:',ch)
        print('method',method)
        print('properties',properties)
        print('body',body)
        ch.basic_ack(delivery_tag=method.delivery_tag) #向rabbitmq发送任务已经执行结束的消息,然后rabbitmq删除队列中这个消息
        print('callback is over!')
    channel.basic_qos(prefetch_count=1) #消息排队数量
    channel.basic_consume(callback,
                     queue=queue_name)#从获取queue的临时的queue_name读任务
                     # no_ack=True) #如果设置了no_ack (自动回复确认消息给rabbitmq,那么rabbitmq就不会保留消息队列,不论回调函数是否执行完成。)
                                  #一般都不设置这个东西,就需要人为指定在哪里给rabbitmq设置删除队列消息的指令,保证只要没有收到消费端处理的结果就再rabbitmq中保留这个消息。然后交给下一个消费者继续重新做这个任务
    
    print('一直等消息,么有了就卡住,to exit press CTRL+C')
    channel.start_consuming()
    收消息端subscribe(订阅者)

    direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息

    只收集特定的队列消息,比如收集指定级别的日志(如下脚本通过命令行执行,来发出不同级别的日志进行测试。&收集不同级别的日志)

    #!/usr/bin/env python
    # Author:Zhangmingda
    
    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel() #声明一个管道
    # channel.queue_declare(queue='hello',durable=True) #声明queue,持久化存储消息队列:durable=True
    '''广播发起时无需指定队列名字,统一由exchange分发'''
    
    '''exchange 转发器进行广播,无需声明消息队列名,消息发过之后就没了'''
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #direct指向
    
    serverity = sys.argv[1] if len(sys.argv) > 1 else 'error' #在发送日志时候定义消息的queue名字,本例作为日志收集的级别
    
    message = ' '.join(sys.argv[2:]) or '%s:Hello World!'%serverity
    
    channel.basic_publish( #向队列里面推送消息
        exchange='logs', #向转发器发消息
        routing_key=serverity, #重要程度serverity作为队列名,本例不同的队列传递不一样的日志内容
        body=message, #发送的消息内容
        properties=pika.BasicProperties(delivery_mode=2,) #队列中的消息持久化,
    )
    print('send message :',message)
    connection.close() #关闭socket
    向rabbitmq发送不同级别的日志(publish)
    #!/usr/bin/env python
    # Author:Zhangmingda
    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    # channel.queue_declare(queue='hello',durable=True)接收消息不直接通过队列,而是通过下面绑定到exchange上的临时队列来接收
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #声明要从哪个转发器接收广播
    result = channel.queue_declare(exclusive=True)#获取消息队列实例。不指定queue名字,rabbit会随机分配一个名字,exclusive=True指的是使用此queue的消费者断开后,自动删除这个queue
    queue_name = result.method.queue #从消息队列实例取消息队列名字
    serverites = sys.argv[1:] #从命令行读取要收集几个级别的日志内容,可以是多个日志级别都收集
    
    if not serverites: #如果没有指定要收集的日志级别,就退出脚本,并给出用法
        sys.stderr.write('Usage: %s [info] [warning] [error]
    ' % sys.argv[0])
        sys.exit(1)
    for serverity in serverites: #指定了日志级别,就全都绑定到exchange转发器
        channel.queue_bind(exchange='logs',
                           queue=queue_name,
                           routing_key=serverity) #将消息queue名字和转发器exchange绑定
    '''接收广播消息,就像收音机,服务器那边发过之后不会保留消息,发当时没接收,就接收不到了'''
    
    def callback(ch,method,properties,body): #回调函数,在接收到消息后用来处理消息内容:body
        print('body',body)
        ch.basic_ack(delivery_tag=method.delivery_tag) #向rabbitmq发送任务已经执行结束的消息,然后rabbitmq删除队列中这个消息
        print('callback is over!')
    channel.basic_qos(prefetch_count=1) #消息排队数量
    channel.basic_consume(callback,
                     queue=queue_name)#从获取queue的临时的queue_name读任务
                     # no_ack=True) #如果设置了no_ack (自动回复确认消息给rabbitmq,那么rabbitmq就不会保留消息队列,不论回调函数是否执行完成。)
                                  #一般都不设置这个东西,就需要人为指定在哪里给rabbitmq设置删除队列消息的指令,保证只要没有收到消费端处理的结果就再rabbitmq中保留这个消息。然后交给下一个消费者继续重新做这个任务
    print('一直等消息,么有了就卡住,to exit press CTRL+C')
    channel.start_consuming() #开始收集
    从rabbitmq获取指定级别的日志内容进行处理(consumer)

    说明:指定 info err warning 则任何级别的都能收到,只指定info则只获取info的queue的消息

    topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

    #!/usr/bin/env python
    # Author:Zhangmingda
    
    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel() #声明一个管道
    # channel.queue_declare(queue='hello',durable=True) #声明queue,持久化存储消息队列:durable=True
    '''广播发起时无需指定队列名字,统一由exchange分发'''
    
    '''exchange 转发器进行广播,无需声明消息队列名,消息发过之后就没了'''
    channel.exchange_declare(exchange='direct_logs',exchange_type='topic') #direct指向
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' #在发送日志时候定义消息的queue名字,本例作为日志收集的级别
    message = ' '.join(sys.argv[2:]) or '%s:Hello World!'% routing_key
    
    channel.basic_publish( #向队列里面推送消息
        exchange='logs', #向转发器发消息
        routing_key=routing_key, #重要程度serverity作为队列名,本例不同的队列传递不一样的日志内容
        body=message, #发送的消息内容
        properties=pika.BasicProperties(delivery_mode=2,) #队列中的消息持久化,
    )
    print('send message :',message)
    connection.close() #关闭socket
    topic_publish
    #!/usr/bin/env python
    # Author:Zhangmingda
    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    # channel.queue_declare(queue='hello',durable=True)接收消息不直接通过队列,而是通过下面绑定到exchange上的临时队列来接收
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #声明要从哪个转发器接收广播
    result = channel.queue_declare(exclusive=True)#获取消息队列实例。不指定queue名字,rabbit会随机分配一个名字,exclusive=True指的是使用此queue的消费者断开后,自动删除这个queue
    queue_name = result.method.queue #从消息队列实例取消息队列名字
    binding_keys = sys.argv[1:] #从命令行读取要收集几个级别的日志内容,可以是多个日志级别都收集
    
    if not binding_keys: #如果没有指定要收集的日志级别,就退出脚本,并给出用法
        sys.stderr.write('Usage: %s [info] [warning] [error]
    ' % sys.argv[0])
        sys.exit(1)
    for binding_key in binding_keys: #指定了日志级别,就全都绑定到exchange转发器
        channel.queue_bind(exchange='logs',
                           queue=queue_name,
                           routing_key=binding_key) #将消息queue名字和转发器exchange绑定
    '''接收广播消息,就像收音机,服务器那边发过之后不会保留消息,发当时没接收,就接收不到了'''
    
    def callback(ch,method,properties,body): #回调函数,在接收到消息后用来处理消息内容:body
        print('body',body)
        ch.basic_ack(delivery_tag=method.delivery_tag) #向rabbitmq发送任务已经执行结束的消息,然后rabbitmq删除队列中这个消息
        print('callback is over!')
    channel.basic_qos(prefetch_count=1) #消息排队数量
    channel.basic_consume(callback,
                     queue=queue_name)#从获取queue的临时的queue_name读任务
                     # no_ack=True) #如果设置了no_ack (自动回复确认消息给rabbitmq,那么rabbitmq就不会保留消息队列,不论回调函数是否执行完成。)
                                  #一般都不设置这个东西,就需要人为指定在哪里给rabbitmq设置删除队列消息的指令,保证只要没有收到消费端处理的结果就再rabbitmq中保留这个消息。然后交给下一个消费者继续重新做这个任务
    print('一直等消息,么有了就卡住,to exit press CTRL+C')
    channel.start_consuming() #开始收集
    topic_consumer

    To receive all the logs run:

    python receive_logs_topic.py "#" 通配符
    

    To receive all logs from the facility "kern":  匹配以kern开头

    python receive_logs_topic.py "kern.*"
    

    Or if you want to hear only about "critical" logs: 匹配以critical结尾

    python receive_logs_topic.py "*.critical"
    

    You can create multiple bindings:  匹配多个条件

    python receive_logs_topic.py "kern.*" "*.critical"
    

    And to emit a log with a routing key "kern.critical" type:

    python emit_log_topic.py "kern.critical" "A critical kernel error"
  • 相关阅读:
    如何禁止在DBGRID末位自动添加一行记录
    DELPHI加密字串(异或运算加密)
    SQL SERVER 正则替换
    sql里的正则表达式
    UFIDA
    delphi raise 语句: 抛出异常
    delphi怎么一次性动态删除(释放)数个动态创建的组件?
    Delphi动态创建组件,并释放内存
    DELPHI 动态 创建和释放 多个 EDIT 控件
    禁止在DBGrid中按delete删除记录
  • 原文地址:https://www.cnblogs.com/zhangmingda/p/9461407.html
Copyright © 2011-2022 走看看