zoukankan      html  css  js  c++  java
  • rabbitmq

    ubuntu安装rabbitmq-server和python安装pika:

    sudo apt update

    sudo apt install rabbitmq-server

    sudo netstat -tulnp  #查看 5672

    ps -ef|grep rabbitmq-server
    pip install pika

    1、队列、消息持久化,接收端确认

    #send.py

    import
    pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True)  #队列持久化 message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
    #receive.py

    import
    pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag)  #消费确认 channel.basic_qos(prefetch_count=1)  #每次接收一个 channel.basic_consume(callback, queue='task_queue') channel.start_consuming()

    2、发布订阅模式:fanout

    #send.py

    import
    pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
    #receive.py

    import
    pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True)  #由系统为用户生成独一无二的queue_name queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

    3、 exchange:direct 组播

    # emit_log_direct.py
    
    
    import pika
    import sys
    
    #establish connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')  #转换器名:direct_logs
    
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'  #列表中索引为1的值是严重程度,0是文件名
    message = ''.join(sys.argv[2:]) or 'hello world'  #消息
    
    channel.basic_publish(exchange='direct_logs',  
                          routing_key=severity,
                          body=message,
                         )
    
    print('[x] sent %r:%r'%(severity,message))
    
    connection.close()
    #receive_log_direct.py
    
    
    import pika
    import sys
    
    #establish connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    
    result = channel.queue_declare(exclusive=True)
    #Exclusive (used by only one connection and the queue will be deleted when that connection closes)
    #为消费者随机取队列名,保证所有消费者的队列名不重复,另外连接断开就删除自己的队列
    
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage:%s [info] [warning] [error]
    "% sys.argv[0])
        sys.exit(1)
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',  
                           queue=queue_name,
                           routing_key=severity)
    #绑定到名为‘direct_logs’的转发器上,生产者的转发器给queue为queue_name,routing_key为severity的队列中推送消息
    
    def callback(ch,method,properties,body):
        print(" [x] %r:%r" % (method.routing_key, body))  #取出的消息是:routing_key为severity
    channel.basic_consume(callback,queue=queue_name,no_ack=True) print('[*] waiting for msg.to exit press ctrl+c') channel.start_consuming() #死循环

    用法:命令行中

      生产者:

    python producer.py info this is info
    [x] sent 'info':'thisisinfo'

      消费者1:接收error

    python recieve.py error
    [*] waiting for msg.to exit press ctrl+c

      消费者2:接收info error

    python recieve.py  info error
    [*] waiting for msg.to exit press ctrl+c
     [x] 'info':b'thisisinfo'

      消费者3:接收info

    python recieve.py info
    [*] waiting for msg.to exit press ctrl+c
     [x] 'info':b'thisisinfo'

    4、exchange:topic

    • * (star) can substitute for exactly one word.
    • # (hash) can substitute for zero or more words.
    # emit_log_topic.py
    
    
    import pika
    import sys
    
    #establish connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ''.join(sys.argv[2:]) or 'hello world'
    
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message,
                         )
    
    print('[x] sent %r:%r'%(routing_key,message))
    
    connection.close()
    #receive_log_topic.py

    import
    pika import sys #establish connection connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs',exchange_type='topic') result = channel.queue_declare(exclusive=True) #Exclusive (used by only one connection and the queue will be deleted when that connection closes) #为消费者随机取队列名,保证所有消费者的队列名不重复,另外连接断开就删除自己的队列 queue_name = result.method.queue bingding_keys = sys.argv[1:] if not bingding_keys: sys.stderr.write("Usage:%s [bingding_keys]... "% sys.argv[0]) sys.exit(1) for bingding_key in bingding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=bingding_key) def callback(ch,method,properties,body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,queue=queue_name,no_ack=True) print('[*] waiting for msg.to exit press ctrl+c') channel.start_consuming() #死循环
    渐变 --> 突变
  • 相关阅读:
    docker-dockerfile构建与部署nginx
    淘宝镜像安装
    css3 中的变量 var 的使用
    CSS样式清除
    css 样式初始化(rem兼容)
    canvas截屏网页为图片下载到本地-html2canvas.js
    移除JSON对象中的某个属性
    js 常用方法集合(持续更新)
    小程序获取上个页面vm对象 解决百度小程序返回上一页不更新onShow更新(适用于uni-app)
    小程序 请求Promise简单封装
  • 原文地址:https://www.cnblogs.com/lybpy/p/8673663.html
Copyright © 2011-2022 走看看