zoukankan      html  css  js  c++  java
  • Ribbitmq

    rittbiMQ:  

    连接远程rabbitmq server

        sudo rabbitmqctl addser mihon mihon123

        sudo rabbitmqctl set_permissions -p / mihon ".*" ".*" ".*"

        set_permissions [-p vhost] {user} {conf} {write} {read}

    客户端连接的时候需要配置认证参数:

         credentials =  pika.PlainCreadentials('mihon','mihon123')

         connection = pika.BlockingConnection(pika.ConnectionParameters(

         '172.20.30.10',5672,'/',credentials))

         channel = connection.channel()

    消息持久化

         channel.queue_=declare(queue='task_queue', durable=Ture) 

    公平分发:

         在消费者端配置perfetch=1,当消息还没有处理完的时候不不在接受消息     

    带消息持久化+公平分发

    生产者端:

        import pika

        import sys

       

        connection = pika.BlockingConnection(pika.ConnectionParameters(

            host='localhost'))

       

        channel.queue_declare(queue='task_queue',durable=Ture)

        message = '.'.join(sys.argv[1:] or "hello world!")

        channel.basic_publish(exchange='',

                            routing_key='task_queue',

                            body=message,

                            properties=pika.BasicProperties(

                                delivery_mode = 2, #make messa  persistent

                            ))   

        print(" [x] Sent %r"%message)

        connection.close()

    消费者端:

        import pika

        import time

        connection = pika.BasicProperties(pika.ConnectionParameters(

            host="localhost"))

        channel = connection.channel()

       

        channel.queue_declare(queue='task_queue',durable=Ture)

        print(' [*]Waiting for message. To exit press CTRL+C')

        def callback(ch,method,properties,body):

            print('asdfasdf')

            time.sleep(body.count(b'.'))

            print('111111')

            ch.basic_ack(delivery_tag = method.delivery_tag)

           

         channel.basic_qos(prefetch_count=1) #公平分发

         channel.basic_consume(callback,

                                queue='task_queue')

          channel.srart_consuming()  #启动接收

    PublishSubscribe

        exchange

        fanout:所有bind到exchange的queue都可以接收到消息

        direct:通过routingKey和exchange决定的哪一个唯一的queue可以接收消息

        topic:所有符合routingKey的routingKey所bind的queue可以接收消息

        headers:通过headers 来决定把消息发给那些queue

       

    有选择的接收消息(exchange type=direct)

         publisher:

            channel.exchange_declare(exchange='direct_logs',type='direct')

            channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)

         subscriber:

            channel.exchange_declare(exchange='direct_logs',type='direct')

            result = channel.queue_declare(exclusive=True)

            for severity inseverities:

                channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

         

  • 相关阅读:
    函数和常用模块【day04】:函数式编程(六)
    函数和常用模块【day04】:递归(五)
    函数和常用模块【day04】:函数参数及调用(二)
    函数和常用模块【day04】:函数介绍(一)
    第一模块:python基础语法
    Python基础【day03】:集合进阶(四)
    Python基础【day03】:字典进阶(二)
    Python基础【day02】:数据运算(二)
    Python基础【day01】:表达式if ...else语句(三)
    Python基础【day01】:Hello World程序(二)
  • 原文地址:https://www.cnblogs.com/mihon/p/8980807.html
Copyright © 2011-2022 走看看