zoukankan      html  css  js  c++  java
  • rabbit MQ 的环境及命令使用(一)

    
    

    RabbitMQ依赖erlang,所以先安装erlang,然后再安装RabbitMQ;

    
    

    先安装erlang,双击erlang的安装文件即可,然后配置环境变量:
    ERLANG_HOME=D:Program Fileserl7.1
    追加到path=%ERLANG_HOME%in;

    
    

    验证erlang是否安装成功, 打开cmd命令窗口,进入erlang的bin路径,输入erl命令,如果出现如下提示,则说明erlang安装成功:

    
    

    D:Program Fileserl7.1in>erl

    
    

    Eshell V7.1 (abort with ^G)

    
    

    再安装RabbitMQ,双击安装文件即可,安装完毕后, 设置环境变量:
    RABBITMQ_SERVER=D:Program FilesRabbitMQ Server abbitmq_server-3.5.6
    追加到path=%RABBITMQ_SERVER%sbin;

    
    

    验证RabbitMQ是否安装成功,在CMD命令窗口输入:

    
    

    C:Windowssystem32>rabbitmq-service

    安装好后,我们进入rabbitMQ安装目录下的sbin目录,在目录下shift+右键打开命令行   【必须要进行这一步,不然中途会发现使用不了,连接不上.使用版本rabbitmq-server-3.7.9.】

    使用rabbitmq-plugins.bat enable rabbitmq_management开启网页管理界面,然后重启rabbitMQ

    远程连接需要 添加用户,可在网页admin页面添加,,添加之后一定要给权限,不给权限的话还是不行,,详细见【客户端-开始任务完成】

    #添加用户

    #
    rabbitmqctl add_vhost vh #rabbitmqctl add_user test test #rabbitmqctl set_user_tags test management #rabbitmqctl set_permissions -p vh test ".*" ".*" ".*"
    
    
    def start(self):
            disconnected = True
            while disconnected:
                try:
                    disconnected = False
                    self.channel.start_consuming() # blocking call
                except pika.exceptions.ConnectionClosed: # when connection is lost, e.g. rabbitmq not running
                    logging.error("Lost connection to rabbitmq service on manager")
                    disconnected = True
                    time.sleep(10) # reconnect timer
                    logging.info("Trying to reconnect...")
                    self.connect()
                    self.clear_message_queue() #could this make problems if the manager replies too fast?
    
    # 参照文档 https://blog.csdn.net/csdn_am/article/details/79894662
    View Code---报错处理【长时间未调用断开处理】---这里使用的是死循环处理,


    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
    1:安装RabbitMQ需要先安装Erlang语言开发包。下载地址 http://www.erlang.org/download.html 在win7下安装Erlang最好默认安装。
          设置环境变量ERLANG_HOME= C:Program Fileserlx.x.x 
          添加到PATH  %ERLANG_HOME%in;
     
    2:安装RabbitMQ 下载地址 http://www.rabbitmq.com/download.html  安装教程:http://www.rabbitmq.com/install-windows.html
          设置环境变量RABBITMQ_SERVER=C:Program Files
    abbitmq_server-x.x.x。
          添加到PATH %RABBITMQ_SERVER%sbin;
     找到环境变量中的path变量:
          双击path,在其后面增加:;%RABBITMQ_SERVER%sbin (注意前面的分号),然后确定即可
     现在打开windows命令行(“cmd”),输入rabbitmq-service如果出现如下所示提示,即表示环境变量配置成功。
     
    3:进入%RABBITMQ_SERVER%sbin 目录以管理员身份运行 rabbitmq-plugins.bat
    rabbitmq-plugins.bat enable rabbitmq_management
         安装完成之后以管理员身份启动 rabbitmq-service.bat
    rabbitmq-service.bat stop
    rabbitmq-service.bat install
    rabbitmq-service.bat start
    
    
    4:浏览器访问localhost:55672  默认账号:guest  密码:guest
    
    
    5. Rabbit还自带监控功能. 
    cmd进到sbin目录,键入rabbitmq-plugins enable rabbitmq_management启用监控管理,然后重启Rabbitmq服务器。 打开网址http://localhost:55672,用户名和密码都是guest。 
    
    
    6. 现在打开浏览器,输入:http://localhost:15672/ ,如果出现以下页面,则表示服务器配置成功。 
    默认用户名为guest,密码:guest
    
    
    如果没有出现以上页面,尝试在windows命令行中输入(以管理员方式运行):
    rabbitmq-plugins enable rabbitmq_management
    然后运行下面的命令来安装:
    rabbitmq-service stop
    rabbitmq-service install
    rabbitmq-service start
    
    
    




    rabbitMQ 生产者工作模式
    import pika 1 创建socket connection = pika.BlockingConnextion(pika.ConnectionParameters('localhost')) 2 声明一个管道 channel = connection.channel() 3 声明queue channel.queue_declare(queue='hello',durable=True) durable:声明是持久化的队列,默认是队列存在内在中的,服务崩了之后,是不会恢复的 4 发消息 channel.basic_publish(exchange='', routing_key='hello', # 队列的名字 body='hello word!' #消息内容 properties=pika.BasicProperties(delivery_mode=2,)) properties #消息持久化,主要语句 delivery_mode 5 发送完毕,关闭队列 connection.close() 消费者工作模式 1 创建socket connection = pika.BlockingConnextion(pika.ConnectionParameters('localhost')) 2 声明一个管道 channel = connection.channel() 3 声明queue channel.queue_declare(queue='hello') 4 消费消息 def callback(ch,method,properties,body): print(' x Received %r'%body) # ch 就是管道的内存对象地址 # method 就是包含发送信息的列表, # properties # 消息处理完,需要手动跟服务端确认 # ch.basic_ack(delivery_tag=method.delivery_tag) #加上下面这个相当于负载匀衡的权重值,处理慢的加上这个 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, #如果收到消息调用处理,回调函数 queue='hello',#队列的名字  no_ack=True #消息处理完了,表示不确认,一般不加,处理完了,由客户端来向服务端确认a_1.basic_ack(delivery_tag=a_2.delivery_tag)
      ) 5 启动就一直运行,没有消息就阻塞 channel.start_consuming() exchange: 转发器 fanout: 所有bind到此exchange的queue都可以接收消息 direct:通过routingkey和exchange决定的那个唯一的qu eue可以接收消息 topic:所有符合routingKey(此时可以是一个表达式)的r outingKey所bind的queue可以接收消息 headers:通过headers来决定把消息发给哪些queue
    import pika
    conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channll = conn.channel()
    channll.queue_declare('url',durable=True)
    for i in range(1,10):
        channll.publish(exchange='',
                        routing_key='url',
                        body='https://wh.lianjia.com/ershoufang/pg%s/'%i)
    conn.close()
    View Code--lianjia_ser----服务端-生成10页__链家
    # python 3.7
    import pika
    import re
    import requests,pymysql
    credential_s = pika.PlainCredentials('root','liu')
    conn = pika.BlockingConnection(pika.ConnectionParameters(host='176.215.44.242',credentials=credential_s))
    channel = conn.channel()
    channel.queue_declare('url',durable=True)
    
    class Lianjia(object):
        def __init__(self,static_url=''):
            self.headers = {'Host':'wh.lianjia.com',
                            'Referer': 'https://wh.lianjia.com/ershoufang/',
                            'Upgrade-Insecure-Requests': '1',
                            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36'}
            if not static_url:
                self.static_url='https://wh.lianjia.com/ershoufang/pg%s/'
            else:
                self.static_url = static_url
            self.patten='data-sl="">.*?data-sl="">(.*?)</a>.*?data-log_index=".*?" data-el=".*?">(.*?)</a>(.*?)</div>.*?</span>.*?<a href=".*?" target="_blank">(.*?)</a></div>.*?<.*?</span>.*?<span>(.*?)</span>'
            self.conn = pymysql.connect(host='176.215.44.242', user='root', password="liu",
                                        database='test', port=3306, charset='utf8')
            self.cur = self.conn.cursor()
    
        def get_page(self):
            res = requests.get(url=self.static_url,headers=self.headers,proxies={'http':'http://58.53.128.83:3128'})
            if res.status_code==200:
                self.write_res(res.text)
    
        def write_res(self,text):
            for i in re.findall(self.patten,text):
                a_1 = i[0] # 标题
                a_2 = i[1] # 小区名
                a_3 = i[2] # 详情
                a_4 = i[3] # 位置
                a_5 =i[4]  #价格
                sql = 'insert into lianjia(b_t,x_q_1,x_q_2,w_z,j_g) values(%s,%s,%s,%s,%s)'
                self.cur.execute(sql, [a_1, a_2, a_3, a_4, a_5])
            self.conn.commit()
    
    
    
    def callback(a_1,a_2,a_3,body):
        print('接收到body',body)
        l = Lianjia(static_url=body)
        l.get_page()
        a_1.basic_ack(delivery_tag=a_2.delivery_tag)
        print(body,'完成,')
    
    channel.basic_consume(callback,
                          queue='url'
                          )
    channel.start_consuming()
    View Code--lianjia_cli----客户端-开始任务完成就开始下一条

    fanout:广播模式(其他地方一样) 生产者需要更改的地方: 1 channel.exchange_declare(exchange
    ='logs', type='fanout') #定义转发器的名字,在消费端上需要bind 2 channel.basic_publish(exchange='logs', routing_key='', body='消息随便来') 消费者需要更改的地方: 1 channel.exchange_declare(exchange='logs', type='fanout') 2 result = channel.queue_declare(exclusive=True)#排他,唯一的,不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 3 queue_name = result.method.queue#然后使用queue_name来进行操作,然后需要绑定转发器 4 channel.queue_bind(exchange='logs', queue=queue_name)#绑定 direct : 生产者: 1 channel.exchange_declare(exchange='direct_logs', type='direct') 2 severity = 'info''warning''error' #级别 3 channel.basic_publish(exchange='logs', routing_key=severity, body='消息随便来') 消费者: 1 severitys = ['info','warning','error'] 2 result = channel.queue_declare(exclusive=True) 3 queue_name = result.method.queue 4 for severity in severitys: channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)#循环列表去绑定 topic: '#'是所有都收 生产者: 跟direct基本一样, 1 channel.exchange_declare(exchange='direct_logs', type='topic') 消费者: 跟direct基本一样, 1 需要更改exchange='topic'

    先暂时写到这里吧,下次更新其他模式

  • 相关阅读:
    php遇到Allowed memory size of 134217728 bytes exhausted问题解决方法
    sql语句 两表关联查询计算数量
    mui 关键词查询
    fatal error C1189: #error : Building MFC application with /MD[d] (CRT dll version) requires MFC sha
    mui mui-control-item获得选中的标签
    mui横向滑动菜单
    淘宝客类别id大全
    (大数据工程师学习路径)第二步 Vim编辑器----高级功能入门
    (大数据工程师学习路径)第二步 Vim编辑器----查找替换
    (大数据工程师学习路径)第二步 Vim编辑器----Vim文档编辑
  • 原文地址:https://www.cnblogs.com/Skyda/p/10018332.html
Copyright © 2011-2022 走看看