zoukankan      html  css  js  c++  java
  • RabitMQ使用_python

    一、RabbitMQ的介绍

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

    主要作用:每个进程(跨语言,系统)之间的中间代理。

    二、常用命令与授权角色说明

    启动监控管理器:rabbitmq-plugins enable rabbitmq_management
    关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
    启动rabbitmq:rabbitmq-service start
    关闭rabbitmq:rabbitmq-service stop
    查看所有的队列:rabbitmqctl list_queues
    清除所有的队列:rabbitmqctl reset
    关闭应用:rabbitmqctl stop_app
    启动应用:rabbitmqctl start_app
    2、使用前需要添加用户,授权等
    用户和权限设置(后面用处)
    添加用户:rabbitmqctl add_user username password
    分配角色:rabbitmqctl set_user_tags username administrator
    新增虚拟主机:rabbitmqctl add_vhost  vhost_name
    将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username '.*' '.*' '.*'
    角色说明
    none  最小权限角色
    management 管理员角色
    policymaker   决策者
    monitoring  监控
    administrator  超级管理员 

    三、exchange交换器的四种类型

    1、fanout:分发给exchange绑定的所有queu中

     2、direct:把消息路由到那些binding key与routing key完全匹配的Queue中

     3、topic:与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:(过滤字符串)

     4、header

    四、简单使用例子

    1、RPC进行批量主机执行命令

    生产者给消费者消息之后,执行之后结果返回给生产者,这种模式叫做RPC(remote procedure call  远程过程调用)

     client:

    start:

     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 #Author:chenxz
     4 import os,sys
     5 Base_dir=os.path.dirname(os.path.dirname(__file__))
     6 sys.path.append(Base_dir)
     7 from core import main
     8 
     9 if __name__ == '__main__':
    10     obj=main.Main()
    11     obj.interactive()
    View Code

    main:

     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 #Author:chenxz
     4 import threading
     5 import os,sys
     6 Base_dir=os.path.dirname(os.path.dirname(__file__))
     7 sys.path.append(Base_dir)
     8 from core import producer
     9 import uuid
    10 
    11 class Main(object):
    12     def __init__(self):
    13         self.information={}
    14     def interactive(self):
    15         '''
    16         每进来一个任务新建一个多线程
    17         :return:
    18         '''
    19         while True:
    20             cmd_inp=input('>>').strip()
    21             if not cmd_inp:
    22                 continue
    23             t=threading.Thread(target=self.dealwith,args=(cmd_inp,))
    24             t.start()
    25             # cmd_list = cmd_inp.split('"')
    26             # cmd=cmd_list()
    27             # print(cmd_list)
    28     def dealwith(self,cmd_inp):
    29         '''
    30         处理输入的命令,并映射到对应的方法(run check check_all)
    31         :param cmd_inp:
    32         :return:
    33         '''
    34         cmd_list = cmd_inp.split('"')
    35 
    36         operate=cmd_list[0].strip().split(' ')[0]
    37         print(operate)
    38         if hasattr(self,operate):
    39             getattr(self,operate)(cmd_list)
    40         else:
    41             print('命令输入错误:%s'%cmd_inp)#
    42 
    43     def run(self,cmd_list):
    44         cmd=cmd_list[1].strip()
    45         hosts_list=cmd_list[2].strip().split(' ')[1:]
    46         print(cmd,hosts_list)
    47         for ip in hosts_list:
    48             #producer_obj=producer.Producer()
    49             #response=producer.send(cmd,ip)
    50             task_id=uuid.uuid4()
    51             response =['queue_123','12345678']
    52             self.information[task_id]={'host':ip,'cmd':cmd,'corr_id':response[1],'callback_queue':response[0]}
    53             print( self.information)
    54 
    55     def check_all(self,cmd_list):
    56         print('存在以下任务:')
    57         for i in self.information:
    58             print("task_id(%s): %s 主机执行 %s 命令"%(i,self.information[i]['host'],self.information[i]['cmd']))
    59 
    60     def check_task(self,cmd_list):
    61         task_id=cmd_list[0].strip().split(' ')[1]
    62         print(task_id)
    63         callback_queue=self.information[task_id]['callback_queue']
    64         corr_id=self.information[task_id]['corr_id']
    65         consume_obj=producer()
    66         response=consume_obj.receive(callback_queue,corr_id)
    67         print(response.decode())
    68         del self.information[task_id]
    69 if __name__ == '__main__':
    70     obj=Main()
    71     obj.interactive()
    View Code

    producer:

     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 #Author:chenxz
     4 import pika
     5 import uuid
     6 
     7 class Producer(object):
     8     def __init__(self):
     9         self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.106.25',credentials=pika.PlainCredentials('root','123')))
    10         self.channel=self.connection.channel()
    11     def send(self,body,queue_name):
    12         #设置返回时的队列名self.callback_queue和随机生成任务id号码self.corr_id
    13         result=self.channel.queue_declare(queue='',exclusive=False)
    14         self.callback_queue=result.method.queue
    15         self.corr_id=str(uuid.uuid4())
    16         #发送消息
    17         self.channel.publish(exchange='',
    18                              routing_key=queue_name,
    19                              properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id),
    20                              body=body)
    21         return self.callback_queue,self.corr_id
    22 
    23     def receive(self,callback_queue,corr_id):
    24         self.callback_id=corr_id
    25         self.response=None
    26         self.channel.basic_consume(
    27             on_message_callback=self.callback,
    28             queue=callback_queue,
    29         )
    30         while not self.response:
    31             self.connection.process_data_events()
    32         return self.response
    33 
    34     def callback(self,ch,method,props,body):
    35         if self.callback_id==props.correlation_id
    36             self.response=body
    View Code

    server:

    start:

     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 #Author:chenxz
     4 
     5 import os,sys
     6 Base_dir=os.path.dirname(os.path.dirname(__file__))
     7 sys.path.append(Base_dir)
     8 from core import main
     9 
    10 if __name__ == '__main__':
    11     obj=main.consumer()
    12     obj.start()
    View Code

    main:

     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 #Author:chenxz
     4 import pika
     5 import os
     6 
     7 class consumer(object):
     8     def __init__(self):
     9         self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.106.25',credentials=pika.PlainCredentials('root','123')))
    10         self.channel=self.connection.channel()
    11     def start(self):
    12         #获取ip
    13         ip=123
    14         self.channel.queue_declare(queue=ip)
    15         self.channel.basic_consume(on_message_callback=self.callback,
    16                                    queue=ip,
    17                                    )
    18         self.channel.start_consuming()
    19     def callback(self,ch,method,props,body):
    20         corr_id=props.correlation_id
    21         callback_queue=props.reply_to
    22         #执行命令,返回结果
    23         response=self.handle(body)
    24         self.channel.basic_publish(exchange='',
    25                                    routing_key=callback_queue,
    26                                    properties=pika.BasicProperties(correlation_id=corr_id,
    27                                                                    ),
    28                                    body=response)
    29         ch.basic_ack(delivery_tag=method.delivery_tag)
    30 
    31     def handle(self,cmd):
    32         cmd=cmd.decode()
    33         message=os.popen(cmd).read()
    34         if not message:
    35             message='wrong cmd'
    36         return message
    View Code
    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    #Author:chenxz

    import os,sys
    Base_dir=os.path.dirname(os.path.dirname(__file__))
    sys.path.append(Base_dir)
    from core import main

    if __name__ == '__main__':
    obj=main.consumer()
    obj.start()
  • 相关阅读:
    什么是 rel="shortlink" ?
    HTML5 怎么兼容 XHTML
    Connection to `ssl://pecl.php.net:443' failed: mac系统
    记录小程序中经常犯的错误---在wxml中使用js方法(打脸不)
    从js对象数组中删除某一个对象
    彻底搞懂字符串提取方法 slice,substr,substring
    vue 项目中笔记-持续更新
    CF526G Spiders Evil Plan
    BZOJ2178 圆的面积并
    计算几何入门
  • 原文地址:https://www.cnblogs.com/chenxiaozan/p/12680481.html
Copyright © 2011-2022 走看看