RabbitMQ依赖erlang,所以先安装erlang,然后再安装RabbitMQ;
先安装erlang,双击erlang的安装文件即可,然后配置环境变量:
ERLANG_HOME=D:Program Fileserl7.1
追加到path=%ERLANG_HOME%in;
验证erlang是否安装成功, 打开cmd命令窗口,进入erlang的bin路径,输入erl命令,如果出现如下提示,则说明erlang安装成功:
D:Program Fileserl7.1in>erl
Eshell V7.1 (abort with ^G)
再安装RabbitMQ,双击安装文件即可,安装完毕后, 设置环境变量:
RABBITMQ_SERVER=D:Program FilesRabbitMQ Server
abbitmq_server-3.5.6
追加到path=%RABBITMQ_SERVER%sbin;
验证RabbitMQ是否安装成功,在CMD命令窗口输入:
C:Windowssystem32>rabbitmq-service
安装好后,我们进入rabbitMQ安装目录下的sbin目录,在目录下shift+右键打开命令行 【必须要进行这一步,不然中途会发现使用不了,连接不上.使用版本rabbitmq-server-3.7.9.】
使用rabbitmq-plugins.bat enable rabbitmq_management开启网页管理界面,然后重启rabbitMQ
远程连接需要 添加用户,可在网页admin页面添加,,添加之后一定要给权限,不给权限的话还是不行,,详细见【客户端-开始任务完成】
#添加用户
#rabbitmqctl add_vhost vh #rabbitmqctl add_user test test #rabbitmqctl set_user_tags test management #rabbitmqctl set_permissions -p vh test ".*" ".*" ".*"
def start(self): disconnected = True while disconnected: try: disconnected = False self.channel.start_consuming() # blocking call except pika.exceptions.ConnectionClosed: # when connection is lost, e.g. rabbitmq not running logging.error("Lost connection to rabbitmq service on manager") disconnected = True time.sleep(10) # reconnect timer logging.info("Trying to reconnect...") self.connect() self.clear_message_queue() #could this make problems if the manager replies too fast? # 参照文档 https://blog.csdn.net/csdn_am/article/details/79894662
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
1:安装RabbitMQ需要先安装Erlang语言开发包。下载地址 http://www.erlang.org/download.html 在win7下安装Erlang最好默认安装。
设置环境变量ERLANG_HOME= C:Program Fileserlx.x.x
添加到PATH %ERLANG_HOME%in;
2:安装RabbitMQ 下载地址 http://www.rabbitmq.com/download.html 安装教程:http://www.rabbitmq.com/install-windows.html
设置环境变量RABBITMQ_SERVER=C:Program Files
abbitmq_server-x.x.x。
添加到PATH %RABBITMQ_SERVER%sbin;
找到环境变量中的path变量:
双击path,在其后面增加:;%RABBITMQ_SERVER%sbin (注意前面的分号),然后确定即可
现在打开windows命令行(“cmd”),输入rabbitmq-service如果出现如下所示提示,即表示环境变量配置成功。
3:进入%RABBITMQ_SERVER%sbin 目录以管理员身份运行 rabbitmq-plugins.bat
rabbitmq-plugins.bat enable rabbitmq_management
安装完成之后以管理员身份启动 rabbitmq-service.bat
rabbitmq-service.bat stop
rabbitmq-service.bat install
rabbitmq-service.bat start
4:浏览器访问localhost:55672 默认账号:guest 密码:guest
5. Rabbit还自带监控功能.
cmd进到sbin目录,键入rabbitmq-plugins enable rabbitmq_management启用监控管理,然后重启Rabbitmq服务器。 打开网址http://localhost:55672,用户名和密码都是guest。
6. 现在打开浏览器,输入:http://localhost:15672/ ,如果出现以下页面,则表示服务器配置成功。
默认用户名为guest,密码:guest
如果没有出现以上页面,尝试在windows命令行中输入(以管理员方式运行):
rabbitmq-plugins enable rabbitmq_management
然后运行下面的命令来安装:
rabbitmq-service stop
rabbitmq-service install
rabbitmq-service start
rabbitMQ 生产者工作模式 import pika 1 创建socket connection = pika.BlockingConnextion(pika.ConnectionParameters('localhost')) 2 声明一个管道 channel = connection.channel() 3 声明queue channel.queue_declare(queue='hello',durable=True) durable:声明是持久化的队列,默认是队列存在内在中的,服务崩了之后,是不会恢复的 4 发消息 channel.basic_publish(exchange='', routing_key='hello', # 队列的名字 body='hello word!' #消息内容 properties=pika.BasicProperties(delivery_mode=2,)) properties #消息持久化,主要语句 delivery_mode 5 发送完毕,关闭队列 connection.close() 消费者工作模式 1 创建socket connection = pika.BlockingConnextion(pika.ConnectionParameters('localhost')) 2 声明一个管道 channel = connection.channel() 3 声明queue channel.queue_declare(queue='hello') 4 消费消息 def callback(ch,method,properties,body): print(' x Received %r'%body) # ch 就是管道的内存对象地址 # method 就是包含发送信息的列表, # properties # 消息处理完,需要手动跟服务端确认 # ch.basic_ack(delivery_tag=method.delivery_tag) #加上下面这个相当于负载匀衡的权重值,处理慢的加上这个 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, #如果收到消息调用处理,回调函数 queue='hello',#队列的名字 no_ack=True #消息处理完了,表示不确认,一般不加,处理完了,由客户端来向服务端确认(a_1.basic_ack(delivery_tag=a_2.delivery_tag)
) ) 5 启动就一直运行,没有消息就阻塞 channel.start_consuming() exchange: 转发器 fanout: 所有bind到此exchange的queue都可以接收消息 direct:通过routingkey和exchange决定的那个唯一的qu eue可以接收消息 topic:所有符合routingKey(此时可以是一个表达式)的r outingKey所bind的queue可以接收消息 headers:通过headers来决定把消息发给哪些queue
import pika conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channll = conn.channel() channll.queue_declare('url',durable=True) for i in range(1,10): channll.publish(exchange='', routing_key='url', body='https://wh.lianjia.com/ershoufang/pg%s/'%i) conn.close()
# python 3.7 import pika import re import requests,pymysql credential_s = pika.PlainCredentials('root','liu') conn = pika.BlockingConnection(pika.ConnectionParameters(host='176.215.44.242',credentials=credential_s)) channel = conn.channel() channel.queue_declare('url',durable=True) class Lianjia(object): def __init__(self,static_url=''): self.headers = {'Host':'wh.lianjia.com', 'Referer': 'https://wh.lianjia.com/ershoufang/', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36'} if not static_url: self.static_url='https://wh.lianjia.com/ershoufang/pg%s/' else: self.static_url = static_url self.patten='data-sl="">.*?data-sl="">(.*?)</a>.*?data-log_index=".*?" data-el=".*?">(.*?)</a>(.*?)</div>.*?</span>.*?<a href=".*?" target="_blank">(.*?)</a></div>.*?<.*?</span>.*?<span>(.*?)</span>' self.conn = pymysql.connect(host='176.215.44.242', user='root', password="liu", database='test', port=3306, charset='utf8') self.cur = self.conn.cursor() def get_page(self): res = requests.get(url=self.static_url,headers=self.headers,proxies={'http':'http://58.53.128.83:3128'}) if res.status_code==200: self.write_res(res.text) def write_res(self,text): for i in re.findall(self.patten,text): a_1 = i[0] # 标题 a_2 = i[1] # 小区名 a_3 = i[2] # 详情 a_4 = i[3] # 位置 a_5 =i[4] #价格 sql = 'insert into lianjia(b_t,x_q_1,x_q_2,w_z,j_g) values(%s,%s,%s,%s,%s)' self.cur.execute(sql, [a_1, a_2, a_3, a_4, a_5]) self.conn.commit() def callback(a_1,a_2,a_3,body): print('接收到body',body) l = Lianjia(static_url=body) l.get_page() a_1.basic_ack(delivery_tag=a_2.delivery_tag) print(body,'完成,') channel.basic_consume(callback, queue='url' ) channel.start_consuming()
fanout:广播模式(其他地方一样) 生产者需要更改的地方: 1 channel.exchange_declare(exchange='logs', type='fanout') #定义转发器的名字,在消费端上需要bind 2 channel.basic_publish(exchange='logs', routing_key='', body='消息随便来') 消费者需要更改的地方: 1 channel.exchange_declare(exchange='logs', type='fanout') 2 result = channel.queue_declare(exclusive=True)#排他,唯一的,不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 3 queue_name = result.method.queue#然后使用queue_name来进行操作,然后需要绑定转发器 4 channel.queue_bind(exchange='logs', queue=queue_name)#绑定 direct : 生产者: 1 channel.exchange_declare(exchange='direct_logs', type='direct') 2 severity = 'info'|'warning'|'error' #级别 3 channel.basic_publish(exchange='logs', routing_key=severity, body='消息随便来') 消费者: 1 severitys = ['info','warning','error'] 2 result = channel.queue_declare(exclusive=True) 3 queue_name = result.method.queue 4 for severity in severitys: channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)#循环列表去绑定 topic: '#'是所有都收 生产者: 跟direct基本一样, 1 channel.exchange_declare(exchange='direct_logs', type='topic') 消费者: 跟direct基本一样, 1 需要更改exchange='topic'
先暂时写到这里吧,下次更新其他模式