zoukankan      html  css  js  c++  java
  • rabbitmq在centos7下安装

    一. RabbitMQ队列

    1
    2
    3
    4
    5
    #消息中间件 -消息队列
      - 异步 提交的任务不需要实时得到结果或回应
     
    python线程Q 实现了在同一个进程间不同线程间的交互
    python线程Q 也可以实现进程间Q的通信

    a. 安装

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    #Centos7 安装
     
    #注意/etc/hosts文件 ip和主机名对应
    wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
    yum install epel-release -y
    yum install rabbitmq-server-3.6.10-1.el7.noarch.rpm
    rabbitmq-plugins enable rabbitmq_management
    cp /usr/share/doc/rabbitmq-server-3.6.10/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
    systemctl restart rabbitmq-server
    systemctl status rabbitmq-server
     
    #创建用户 授权
    rabbitmqctl  add_user alex alex3714
    rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"

     启动失败:

    [root@openstack ~]# systemctl start rabbitmq-server.service 
    Job for rabbitmq-server.service failed. See ‘systemctl status rabbitmq-server.service’ and ‘journalctl -xn’ for details. 
    
    
    [root@openstack ~]# systemctl status rabbitmq-server.service 
    rabbitmq-server.service – RabbitMQ broker 
    Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; enabled) 
    Active: failed (Result: exit-code) since 六 2016-04-30 23:19:21 CST; 8s ago 
    Process: 3593 ExecStop=/usr/lib/rabbitmq/bin/rabbitmqctl stop (code=exited, status=2) 
    Process: 3563 ExecStart=/usr/lib/rabbitmq/bin/rabbitmq-server (code=exited, status=1/FAILURE) 
    Main PID: 3563 (code=exited, status=1/FAILURE)
    
    4月 30 23:19:21 openstack rabbitmqctl[3593]: * epmd reports: node ‘rabbit’ not running at all 
    4月 30 23:19:21 openstack rabbitmqctl[3593]: no other nodes on openstack 
    4月 30 23:19:21 openstack rabbitmqctl[3593]: * suggestion: start the node 
    4月 30 23:19:21 openstack rabbitmqctl[3593]: current node details: 
    4月 30 23:19:21 openstack rabbitmqctl[3593]: – node name: rabbitmqctl3593@openstack 
    4月 30 23:19:21 openstack rabbitmqctl[3593]: – home dir: /var/lib/rabbitmq 
    4月 30 23:19:21 openstack rabbitmqctl[3593]: – cookie hash: DuHinHyRsf96Yx7NcAaAuQ== 
    4月 30 23:19:21 openstack systemd[1]: rabbitmq-server.service: control process exited, code=exited status=2 
    4月 30 23:19:21 openstack systemd[1]: Failed to start RabbitMQ broker. 
    4月 30 23:19:21 openstack systemd[1]: Unit rabbitmq-server.service entered failed state.
    

     解决办法:

    firewall-cmd --permanent --add-port=5672/tcp
    firewall-cmd --reload
    setsebool -P nis_enabled 1

    systemctl start rabbitmq-server再次启动:

    [root@oldboy_zny rabbitmq]# systemctl start rabbitmq-server
    [root@oldboy_zny rabbitmq]# systemctl status rabbitmq-server
    rabbitmq-server.service - RabbitMQ broker
       Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; enabled)
       Active: active (running) since 一 2018-03-05 10:33:48 CST; 8s ago
      Process: 22014 ExecStop=/usr/sbin/rabbitmqctl stop (code=exited, status=0/SUCCESS)
     Main PID: 22152 (beam)
       Status: "Initialized"
       CGroup: /system.slice/rabbitmq-server.service
               ├─22152 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 32000 -K true -- -root /usr/lib64/erlang...
               ├─22441 inet_gethost 4
               └─22442 inet_gethost 4
    
    3月 05 10:33:47 oldboy_zny rabbitmq-server[22152]: ##  ##
    3月 05 10:33:47 oldboy_zny rabbitmq-server[22152]: ##########  Logs: /var/log/rabbitmq/rabbit@oldboy_zny.log
    3月 05 10:33:47 oldboy_zny rabbitmq-server[22152]: ######  ##        /var/log/rabbitmq/rabbit@oldboy_zny-sasl.log
    3月 05 10:33:47 oldboy_zny rabbitmq-server[22152]: ##########
    3月 05 10:33:47 oldboy_zny rabbitmq-server[22152]: Starting broker...
    3月 05 10:33:48 oldboy_zny rabbitmq-server[22152]: systemd unit for activation check: "bin"
    3月 05 10:33:48 oldboy_zny rabbitmq-server[22152]: 'systemctl' unavailable, falling back to sleep
    3月 05 10:33:48 oldboy_zny systemd[1]: Started RabbitMQ broker.
    3月 05 10:33:49 oldboy_zny python[22447]: SELinux is preventing /usr/bin/bash from getattr access on the file .
                                               
                                               *****  Plugin catchall (100. confidence) suggests   **************************...
    3月 05 10:33:53 oldboy_zny rabbitmq-server[22152]: completed with 6 plugins.
    Hint: Some lines were ellipsized, use -l to show in full.
    

      

    b. 创建用户 授权 

    #远程连接rabbitmq server的话,需要配置权限
    
    #创建用户
    rabbitmqctl  add_user alex alex3714
     
    #同时还要配置权限,允许从外面访问
    rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"
    
      set_permissions [-p vhost] {user} {conf} {write} {read}
    
      vhost
      The name of the virtual host to which to grant the user access, defaulting to /.
    
      user
      The name of the user to grant access to the specified virtual host.
    
      conf
      A regular expression matching resource names for which the user is granted configure permissions.
    
      write
      A regular expression matching resource names for which the user is granted write permissions.
    
      read
      A regular expression matching resource names for which the user is granted read permissions.

    c. python rabbitMQ module 安装

    1
    2
    3
    4
    5
    6
    7
    pip install pika
    or
    easy_install pika
    or
    源码
       
    https://pypi.python.org/pypi/pika

    二. 事例

    参考博客:http://www.cnblogs.com/yuanchenqi/articles/8507109.html

    1
    2
    3
    4
    注意: 一般申明队列(如下代码)只需要在服务端申明,但客户端也可以申明,是防止如果服务端没有启动,客户端先启动后没有队列会报错
         此时服务端如果有相同代码,会检查,如果有相同队列就不创建
     
    channel.queue_declare(queue='hello')

    a. 服务端和客户端一对一 

    1
    2
    3
    4
    5
    #查看队列
        # rabbitmqctl list_queues
     
    #客户端再次申明队列是因为客户端要清楚去哪里取数据
        channel.queue_declare(queue='hello')
    import pika
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #建立socket
    
    channel = connection.channel()            #创建rabbitmq协议通道
    
    channel.queue_declare(queue='hello')      #通过通道生成一个队列
    
    channel.basic_publish(exchange='',
                          routing_key='hello',      #队列
                          body='Hello World!')      #内容
    print(" [x] Sent 'Hello World!'")
    connection.close()
    sender.py
    import pika
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #建立socket
    
    channel = connection.channel()
    
    
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(ch)              #上面channel = connection.channel()对象
        print(method)          #除了服务端本身的数据,还带一些参数
        print(properties)      #属性
        print(body)            #byte数据
    
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    
    print(' Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    receive.py

    b. 消息持久化

    1. 模拟客户端中断 观察服务端队列的数据会不会返回(不会)

    1
    2
    3
    4
    #- 开启一个服务端,两个客户端
    #- 服务端向队列中存放一个值,一客户端从队列中取到数据,在睡20秒期间中断,表示出错,它不会报告给服务端
    #- 这时队列中为零,另一客户端也不会取到值
    # no_ack=True 表示客户端处理完了不需要向服务端确认消息
    import pika
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #建立socket
    
    channel = connection.channel()            #创建rabbitmq协议通道
    
    channel.queue_declare(queue='hello')      #通过通道生成一个队列
    
    channel.basic_publish(exchange='',
                          routing_key='hello',      #队列
                          body='Hello World!')      #内容
    print(" [x] Sent 'Hello World!'")
    connection.close()
    send.py
    import pika
    import time
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #建立socket
    
    channel = connection.channel()
    
    
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print("received msg...start process",body)
        time.sleep(10)
        print("end process...")
    
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    
    print(' Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    receive.py

    2. 模拟客户端中断 观察服务端队列的数据会不会返回(会) 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #1. 生产者端发消息时,加参数 消息持久化
        properties=pika.BasicProperties(
            delivery_mode=2# make message persistent
        ),
    #2. 消费者端,消息处理完毕时,发送确认包  
        ch.basic_ack(delivery_tag=method.delivery_tag)
     
        channel.basic_consume(callback, #取到消息后,调用callback 函数
          queue='task1',)
          #no_ack=True) #消息处理后,不向rabbit-server确认消息已消费完毕
    #- 开启一个服务端,两个客户端
    #- 服务端向队列中存放一个值,一客户端从队列中取到数据,在睡20秒期间中断,表示出错,它会报给服务端,服务端队列还有值
    #- 这时启动另一客户端还可以取到值
    import pika
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #建立socket
    
    channel = connection.channel()            #创建rabbitmq协议通道
    
    channel.queue_declare(queue='hello')      #通过通道生成一个队列
    
    channel.basic_publish(exchange='',
                          routing_key='hello',      #队列
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          ),
                          body='Hello World!')      #内容
    print(" [x] Sent 'Hello World!'")
    connection.close()
    sender.py 
    import pika
    import time
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #建立socket
    
    channel = connection.channel()
    
    
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print("received msg...start process",body)
        time.sleep(10)
        print("end process...")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(callback,
                          queue='hello',
                          )
    
    print(' Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    receive.py

    c. 队列持久化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #队列持久化
     
    channel.queue_declare(queue='hello',durable=True)
    systemctl restart rabbitmq-server       #重启服务发现hello队列还在,但是消息不在
    rabbitmqctl list_queues
        #hello
     
     
    #队列和消息持久化
    channel.queue_declare(queue='hello',durable=True)
     
    properties=pika.BasicProperties(
        delivery_mode=2# make message persistent
    ),
    systemctl restart rabbitmq-server       #重启服务发现队列和消息都还在
    rabbitmqctl list_queues
        #hello 6
    import pika
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('192.168.11.106',credentials=credentials))  #建立socket
    
    channel = connection.channel()            #创建rabbitmq协议通道
    
    channel.queue_declare(queue='hello',durable=True)      #通过通道生成一个队列
    
    channel.basic_publish(exchange='',
                          routing_key='hello',      #队列
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          ),
                          body='Hello World!')      #内容
    print(" [x] Sent 'Hello World!'")
    connection.close()
    sender.py

    d. fanout 广播

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #服务端:
      - 不需要申明队列
    #客户端:
      - 每个客户端都需要申明一个队列,自动设置队列名称,收听广播,当收听完后queue删除
      - 把队列绑定到exchange上
    #注意:客户端先打开,服务端再打开,客户端会收到消息
      
    #应用:
      - 微博粉丝在线,博主发消息,粉丝可以收到
     
    #如果服务端先启动向exchange发消息,这时客户端没有启动,没有队列保存数据(exchange不负责保存数据)
    #这时数据会丢,队列中没有数据
    #exchange只负责转发
    import pika
    import sys
    import time
    
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #建立socket
    
    channel = connection.channel()                             #创建rabbitmq协议通道
    
    channel.exchange_declare(exchange='logs',type='fanout')
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print(" Send %r" % message)
    connection.close()
    sender.py
    import pika
    import time
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #建立socket
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             type='fanout')
    
    queue_obj = channel.queue_declare(exclusive=True)  #随机创建一个队列对象 exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_name = queue_obj.method.queue                #不指定queue名字,rabbit会随机分配一个名字,
    
    channel.queue_bind(exchange='logs',queue=queue_name)    #把queue绑定到exchange
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    receive.py

    e. direct 组播

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    #客户端一:
        - python3 receive1.py info
    #客户端二:
        - python3 receive1.py  error
    #客户端三:
        - python3 receive1.py  warning
    #客户端四:
        - python3 receive1.py  warning error info
    #服务端:
        - python3 receive1.py  warning
    import pika
    import sys
    import time
    
    
    credentials = pika.PlainCredentials("egon","egon123")                   #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #建立socket
    
    channel = connection.channel()                  #创建rabbitmq协议通道
    
    channel.exchange_declare(exchange='direct_logs',type='direct')
    
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    
    print(" Send %r:%r" % (severity, message))
    connection.close()
    sender.py
    import pika
    import time
    import sys
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #建立socket
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',type='direct')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    channel.start_consuming()
    receive.py

    f. topic 规则传播

    1
    2
    3
    4
    5
    6
    7
    8
    9
    #客户端一:
        - python3 receive1.py *.django
    #客户端二:
        - python3 receive1.py mysql.error
    #客户端三:
        - python3 receive1.py mysql.*
     
    #服务端:
        - python3 receive1.py  #匹配相应的客户端 
    import pika
    import time
    import sys
    
    credentials = pika.PlainCredentials("egon","egon123")                     #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #建立socket
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:]
    if not binding_keys:
        print(sys.argv[1:])
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    receive.py
    import pika
    import sys
    import time
    
    
    credentials = pika.PlainCredentials("egon","egon123")                   #授权的账号 密码
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('172.16.42.128',credentials=credentials))  #建立socket
    
    channel = connection.channel()                  #创建rabbitmq协议通道
    
    channel.exchange_declare(exchange='topic_logs',type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    
    print(" [x] Sent %r:%r" % (routing_key, message))
    sender.py

    以生产者消费者举例四种模式:

      生产者:

    # ! /usr/bin/env python
    # -*- coding: utf-8 -*-
    #  Author :    张宁阳
    #  date:      2018/3/6
    # ######################### 简答模式生产者 #########################
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.20.56',port=5672,
            credentials=pika.credentials.PlainCredentials(
                username='alex',
                password='alex3714'
            )))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='ssssssssssssss')
    
    print(" 已经 发送 'ssssssssssssssssss'")
    connection.close()
    
    
    ######################基于exchang分发模式的生产者###########################
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.20.56',port=5672,
            credentials=pika.credentials.PlainCredentials(
                username='alex',
                password='alex3714'
            )))
    
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    message = "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    
    
    
    ##########基于exchange关键字模式的生产者##############
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.20.56',port=5672,
            credentials=pika.credentials.PlainCredentials(
                username='alex',
                password='alex3714'
            )))
    
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    
    message = "info: Hello World!"
    channel.basic_publish(exchange='direct_logs',
                          routing_key='info',#关键字:info、error、warning
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    
    
    
    ##########基于exchange模糊匹配模式的生产者##############
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.20.56',port=5672,
            credentials=pika.credentials.PlainCredentials(
                username='alex',
                password='alex3714'
            )))
    
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    message = "info: Hello World!"
    #routing_key="模糊匹配字符"
    routing_keys=["old.boy.python ","old.boy"]
    for key in routing_keys:
        channel.basic_publish(exchange='topic_logs',
                              routing_key=key,
                              body=message)
        print(" [x] Sent %r" % message)
    connection.close()

      消费者

    # ! /usr/bin/env python
    # -*- coding: utf-8 -*-
    #  Author :    张宁阳
    #  date:      2018/3/6
    ##########################基于简单模式的 消费者 ##########################
    import pika
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.20.56',port=5672,
            credentials=pika.credentials.PlainCredentials(
                username='alex',
                password='alex3714'
            )))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(" 已经 接受 %r" % body)
    
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    
    
    
    ##############基于exchange分发模式消费者#####################
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.20.56',port=5672,
            credentials=pika.credentials.PlainCredentials(
                username='alex',
                password='alex3714'
            )))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    
    ##########################基于exchange的关键字模式的 消费者 ##########################
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.20.56', port=5672,
                                                                   credentials=pika.credentials.PlainCredentials(
                                                                       username='alex',
                                                                       password='alex3714'
                                                                   )))
    
    channel = connection.channel()
    
    # 生成交换机
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    
    # 创建的队列
    result = channel.queue_declare(exclusive=True)
    # 创建的队列的随机名称
    queue_name = result.method.queue
    
    # 给队列与direct_logs这个交换机绑定三个关键字
    severities = ["info", "warning", "error"]
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    
    
    ##########################基于exchange的模糊匹配模式的 消费者 ##########################
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.20.56', port=5672,
                                                                   credentials=pika.credentials.PlainCredentials(
                                                                       username='alex',
                                                                       password='alex3714'
                                                                   )))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    
    
    bind_keys=["old.*","old.#"]
    for bind_key in bind_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=bind_key)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    

     基于RabbitMQ的RPC

    回到顶部

    Callback queue 回调队列

    一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to

    Correlation id 关联标识

    一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

     
    客户端发送请求:某个应用将请求信息交给客户端,然后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息
    
    服务器端工作流: 等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中
    
    客户端接受处理结果: 客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用

    服务器端

    #!/usr/bin/env python
    import pika
    
    # 建立连接,服务器地址为localhost,可指定ip地址
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    
    # 建立会话
    channel = connection.channel()
    
    # 声明RPC请求队列
    channel.queue_declare(queue='rpc_queue')
    
    # 数据处理方法
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    
    # 对RPC请求队列中的请求进行处理
    def on_request(ch, method, props, body):
        n = int(body)
    
        print(" [.] fib(%s)" % n)
    
        # 调用数据处理方法
        response = fib(n)
    
        # 将处理结果(响应)发送到回调队列
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id = 
                                                             props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    # 负载均衡,同一时刻发送给该服务器的请求不超过一个
    channel.basic_qos(prefetch_count=1)
    
    channel.basic_consume(on_request, queue='rpc_queue')
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()

    客户端

    #!/usr/bin/env python
    import pika
    import uuid
    
    class FibonacciRpcClient(object):
        def __init__(self):
            ”“”
            客户端启动时,创建回调队列,会开启会话用于发送RPC请求以及接受响应
            
            “”“
            
            # 建立连接,指定服务器的ip地址
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                    host='localhost'))
                    
            # 建立一个会话,每个channel代表一个会话任务
            self.channel = self.connection.channel()
            
            # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次
            result = self.channel.queue_declare(exclusive=True)
            # 将次队列指定为当前客户端的回调队列
            self.callback_queue = result.method.queue
            
            # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理; 
            self.channel.basic_consume(self.on_response, no_ack=True,
                                       queue=self.callback_queue)
    
    
        # 对回调队列中的响应进行处理的函数
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
    
        # 发出RPC请求
        def call(self, n):
        
            # 初始化 response
            self.response = None
            
            #生成correlation_id 
            self.corr_id = str(uuid.uuid4())
            
            # 发送RPC请求内容到RPC请求队列`rpc_queue`,同时发送的还有`reply_to`和`correlation_id`
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                             reply_to = self.callback_queue,
                                             correlation_id = self.corr_id,
                                             ),
                                       body=str(n))
                                       
            
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    
    # 建立客户端
    fibonacci_rpc = FibonacciRpcClient()
    
    # 发送RPC请求
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)

     

      

    Alex 

  • 相关阅读:
    (二分查找 拓展) leetcode 69. Sqrt(x)
    (二分查找 拓展) leetcode 162. Find Peak Element && lintcode 75. Find Peak Element
    (链表) lintcode 219. Insert Node in Sorted Linked List
    (二分查找 拓展) leetcode 34. Find First and Last Position of Element in Sorted Array && lintcode 61. Search for a Range
    (最短路 Floyd) P2910 [USACO08OPEN]寻宝之路Clear And Present Danger 洛谷
    (字符串 数组 递归 双指针) leetcode 344. Reverse String
    (二叉树 DFS 递归) leetcode 112. Path Sum
    (二叉树 DFS 递归) leetcode 101. Symmetric Tree
    (二叉树 递归) leetcode 144. Binary Tree Preorder Traversal
    (二叉树 递归 DFS) leetcode 100. Same Tree
  • 原文地址:https://www.cnblogs.com/zhangningyang/p/8507745.html
Copyright © 2011-2022 走看看