zoukankan      html  css  js  c++  java
  • RABBITMQ队列

    安装python rabbitMQ module 

    pip install pika
    官网 https://pypi.python.org/pypi/pika

    安装rabbit-server服务,centos7系统

    这个中间商就是MQ erlang语言支持的
    
    安装依赖ERLANG
    wget https://mirrors.tuna.tsinghua.edu.cn/epel/7/x86_64/Packages/e/epel-release-7-11.noarch.rpm
    rpm -ivh epel-release-7-11.noarch.rpm
    yum repolist
    yum clean all
    yum install erlang
    
    安装依赖SOCAT
    yum - y install socat
    
    安装RABBITMQ
    rpm - ivh https: // bintray.com / rabbitmq / rabbitmq - server - rpm / download_file?file_path = rabbitmq - server - 3.6.10 - 1.el7.noarch.rpm
    /sbin/service rabbitmq - server start
    
    端口号 5672

    简单队列通信

    远程连接rabbit server的话,需要配置远程用户,权限

    在rabbitmq server上创建用户

    rabbitmqctl add_user joker 123456  

    配置权限,允许从外面访问

    rabbitmqctl  set_permissions -p "/" joker '.*' '.*' '.*'
    

    set_permissions [-p vhost] {user} {conf} {write} {read}

    vhost

    The name of the virtual host to which to grant the user access, defaulting to /.

    user

    The name of the user to grant access to the specified virtual host.

    conf

    A regular expression matching resource names for which the user is granted configure permissions.

    write

    A regular expression matching resource names for which the user is granted write permissions.

    read

    A regular expression matching resource names for which the user is granted read permissions.

    消息队列收发端连接到远程的rabbit-server需要配置认证参数 

    credentials = pika.PlainCredentials('joker', 'joker123')
     
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        'remoteip',5672,'/',credentials))
    channel = connection.channel()

    send端

    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))    # 建立socket
    channel = connection.channel()  # 建立通道
     
    #声明queue
    channel.queue_declare(queue='hello') # 通道里面声明队列
     
    #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    channel.basic_publish(exchange='',
                          routing_key='hello', # 队列
                          body='Hello World!') # 消息
    print(" [x] Sent 'Hello World!'")
    connection.close()

    receive端 

    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    channel = connection.channel()
     
     
    #You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    #was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    channel.queue_declare(queue='hello')
     
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
     
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)    # 这个是不需要接收确认信息,后面会继续说
     
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming() # 开始接收消息,这里是阻塞状态,一直接收下去

    Work Queues

    这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多

    采用轮训机制,如果有多个消费者,会按照先后顺序发送消息

    消费者处理完了告诉生产者处理完了,生产者就会从消息队列删除消息,如果没有处理完宕机了,那消息就没了

    no_ack=FALSE

    # 当消费者执行任务的时候断掉,那么这个消息就掉了,no_ack注销掉,消费者就必须给服务器信息,是否执行完了,手动给服务器确认
    # 判断断掉的机制就是SOCKET断了 

    ch.basic_ack(delivery_tag=method.delivery_tag) # 手动跟服务端确认

    消息持久化

    channel.queue_declare(queue='hello', durable=True) # 持久化消息队列名称
    channel.basic_publish(exchange='',
                          routing_key="task_queue",
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent # 消息持久化
                          ))

    服务器宕机,队列里的消息持久在  

     

    消息公平发布 

    消息公平分发,谁有本事(性能高)你多发,看队列有多少消息channel.basic_qos(prefetch_count=1),我这里有一条消息就先别给我发

    channel.basic_qos(prefetch_count=1)  

    生产者端

    # !/usr/bin/env python
    # _*_coding:utf-8_*_
    # Author:Joker
    
    
    import pika
    
    username = 'joker'
    pwd = '123456'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKET
    channel = connection.channel() # 建立通道
    
    # 管道里面声明queue
    channel.queue_declare(queue='hello2',durable=True) # 持久化队列
    
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    channel.basic_publish(exchange='',
                          routing_key='hello2',  # QUEUE名字
                          body='Hello World!',
                          properties=pika.BasicProperties( # 消息持久化
                              delivery_mode=2,  # make message persistent
                          )
                          )  # 消息内容
    print(" [x] Sent 'Hello World!'")
    connection.close()

    消费者端

    # !/usr/bin/env python
    # _*_coding:utf-8_*_
    # Author:Joker
    
    
    import pika,time
    
    username = 'joker'
    pwd = '123456'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='remoteip',port=5672,credentials=user_pwd))
    channel = connection.channel()
    
    # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    # was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    channel.queue_declare(queue='hello2',durable=True) # 无法保证生产消费谁先运行,为了不出错可以自己声明,durable=True持久化队列
    
    
    def callback(ch, method, properties, body):
        print(ch,method,properties)
        # <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('192.168.0.139', 49993)->('60.205.188.107', 5672) params=<ConnectionParameters host=60.205.188.107 port=5672 virtual_host=/ ssl=False>>>>
        # <Basic.Deliver(['consumer_tag=ctag1.e67a54d9b91a40ab82647f2dafa9d9ba', 'delivery_tag=2', 'exchange=', 'redelivered=False', 'routing_key=hello'])>
        # <BasicProperties(['delivery_mode=2'])>
        # 通道的内存对象,链接的消息,消息属性见下面
        # time.sleep(30)
        print(" [x] Received %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag) # 手动跟服务端确认,devlivery_tag是信息标签,一般是第一条是1,第二条是2
    
    
    channel.basic_qos(prefetch_count=1) # 如果我这里还有消息就先别发给我
    channel.basic_consume(# 消费消息
                          callback,  # 如果收到消息,就调用CALLBACK函数来处理消息
                          queue='hello2', # 从哪个队列收消息
                          # no_ack=True) # 不会给服务端发消息,是否处理完
                            )
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming() # 永远收下去

    Propertie

    content_type : 消息内容的类型
    content_encoding: 消息内容的编码格式
    priority: 消息的优先级
    correlation_id:关联id
    reply_to: 用于指定回复的队列的名称
    expiration: 消息的失效时间
    message_id: 消息id
    timestamp:消息的时间戳
    type: 类型
    user_id: 用户id
    app_id: 应用程序id
    cluster_id: 集群id
    

      

    PublishSubscribe(消息发布订阅) 

    我们在之前学习的是1对1的消息发送接收,也就是消息只能发送到指定的queue,如果你想发送的消息让所有的queue收到,就要用到exchage了

    Exchange在定义的时候有类型的,以决定到底是哪些queue符合条件,可以接收消息

    fanout: 所有bind到此exchange的queue都可以接收消息

    EXCHANGE FANDOUT 纯广播,因为广播的原因,不会帮你保留消息,消费者先启动才能收到消息,每个人都能收到

    消息publisher

    # !/usr/bin/env python
    # _*_coding:utf-8_*_
    # Author:Joker
    
    
    import pika
    import sys
    
    username = 'joker'
    pwd = '123456'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKET
    channel = connection.channel() # 建立通道
    
    channel.exchange_declare(exchange='logs',exchange_type='fanout')   # 广播不需要写Q
    
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    
    channel.basic_publish(exchange='logs',
                          routing_key='', # Q 名
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()

    消息subscriber

    # !/usr/bin/env python
    # _*_coding:utf-8_*_
    # Author:Joker
    
    
    import pika
    
    username = 'joker'
    pwd = '123456'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKET
    channel = connection.channel() # 建立通道
    
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    
    result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_name = result.method.queue
    print(queue_name) # 随机
    
    channel.queue_bind(exchange='logs', # 绑定Q到转发器EXCHAGE上
                       queue=queue_name) #
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    
    
    

    有选择的接收消息exchage type=direct

    队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列  

    publisher

    # !/usr/bin/env python
    # _*_coding:utf-8_*_
    # Author:Joker
    import pika import sys username = 'joker' pwd = '123456' user_pwd = pika.PlainCredentials(username, pwd) connection = pika.BlockingConnection(pika.ConnectionParameters( host='remoteip',port=5672,credentials=user_pwd)) # 建立SOCKET channel = connection.channel() # 建立通道 channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close() # import sys # print(sys.argv[0]) # /Users/liqianlong/Desktop/Django project/kkk/开启py之旅/消息队列/7DIRECT_PUBLISHER.py

    subscriber

    # !/usr/bin/env python
    # _*_coding:utf-8_*_
    # Author:Joker
    
    
    import pika
    import sys
    
    username = 'joker'
    pwd = '123456'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKET
    channel = connection.channel() # 建立通道
    
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    severities = sys.argv[1:]   # 获取脚本后面跟的参数,WARNING,ERROR,INFO ..
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()  

    更细致的消息过滤exchage type=topic

    publisher

    # !/usr/bin/env python
    # _*_coding:utf-8_*_
    # Author:Joker
    
    import pika
    import sys
    
    username = 'joker'
    pwd = '123456'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKET
    channel = connection.channel() # 建立通道
    
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()

    subscriber

    # !/usr/bin/env python
    # _*_coding:utf-8_*_
    # Author:Joker
    
    import pika
    import sys
    
    username = 'joker'
    pwd = '123456'
    user_pwd = pika.PlainCredentials(username, pwd)
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='remoteip',port=5672,credentials=user_pwd))  # 建立SOCKET
    channel = connection.channel() # 建立通道
    
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:] # 监测的文件匹配
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    

    To receive all the logs run:

    python receive_logs_topic.py "#"  可以收取所有的信息
    

    To receive all logs from the facility "kern":

    python receive_logs_topic.py "kern.*"
    

    Or if you want to hear only about "critical" logs:

    python receive_logs_topic.py "*.critical"
    

    You can create multiple bindings:

    python receive_logs_topic.py "kern.*" "*.critical"
    

    And to emit a log with a routing key "kern.critical" type:

    python emit_log_topic.py "kern.critical" "A critical kernel error"

    Remote procedure call(RPC) 

    远程调用方法执行,SNMP简单网络管理协议,发一条执行返回结果就是一个简单的RPC,实现的就是服务器端也是客户端双向

    server

    #_*_coding:utf-8_*_
    __author__ = 'joker lii'
    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
     
    channel = connection.channel()
     
    channel.queue_declare(queue='rpc_queue')
     
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
     
    def on_request(ch, method, props, body):
        n = int(body)
     
        print(" [.] fib(%s)" % n)
        response = fib(n)
     
        ch.basic_publish(exchange='', # 结果发挥给客户端
                         routing_key=props.reply_to, # props指的就是客户端里面的replay_to的q
                         properties=pika.BasicProperties(correlation_id = 
                                                             props.correlation_id), # 客户端的correlation_id
                         body=str(response))
        ch.basic_ack(delivery_tag = method.delivery_tag) # 确保消息消费了
     
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')
     
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    

    client

    import pika
    import uuid
    import time class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) # socket self.channel = self.connection.channel() # 通道 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 随机q self.channel.basic_consume(self.on_response, no_ack=True, # on_response 回调函数 queue=self.callback_queue) # 随机q def on_response(self, ch, method, props, body): # 消息收到后做了些什么 if self.corr_id == props.correlation_id: # 先判断自己当前的id和服务器端的id是否一样,保证消息一对一 self.response = body # 这里将收到的消息赋值response,看是不是none def call(self, n): self.response = None # response,谁会将这个none改为true,因为none就会一直收 self.corr_id = str(uuid.uuid4()) # 唯一的随机一串数字
    self.channel.basic_publish(exchange='', # 发消息 routing_key='rpc_queue', # rpc_queue properties=pika.BasicProperties( # 消息持久化 reply_to = self.callback_queue, # 让服务器端执行完命令返回这个q里面 correlation_id = self.corr_id, # 唯一的随机一串数字 ), body=str(n)) # 发的消息
    while self.response is None: # response为none就会一直收 self.connection.process_data_events() # 非阻塞版的start_consuming(),收到消息触发回调函数,没有收到继续往下走
           print('no msg...')
    time.sleep(0.5)  return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) # 调用call方法 print(" [.] Got %r" % response)

     

  • 相关阅读:
    计算机一些常见名词解释
    [MSF]server/capture/http_javascript_keylogger键盘记录
    .net(C#)访问Oracle数据库的几种免安装组件的对比
    C# UserControl 判断是否是设计模式中
    Python_cmd的各种实现方法及优劣(subprocess.Popen, os.system和commands.getstatusoutput)
    python 怎么启动一个外部命令程序, 并且不阻塞当前进程
    创建注记图层C# IFeatureWorkspaceAnno
    VisualSVN Server 导入已存在的库
    带您了解Oracle层次查询
    win7系统使用engine进行开发报错,“未能加载文件或程序集”
  • 原文地址:https://www.cnblogs.com/jokerbj/p/9436910.html
Copyright © 2011-2022 走看看