zoukankan      html  css  js  c++  java
  • RabbitMQ----整理

    ------------------------------------------------------------------RabbitMQ------------------------------------------------------------------------------
    RabbitMQ是一个在AMQ基础上完整的,可复用的消息系统。-------------------遵循Mozilla Public License开源协议。
    	概念:
    		Exchange:-----------------------------------------消息交换机,它指定消息按什么规则,路由到哪个队列。
    	  Queue:--------------------------------------------消息队列载体,每个消息都会被投入到一个或多个队列。
    	  Binding:------------------------------------------绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
    	  Routing Key:--------------------------------------路由关键字,exchange根据这个关键字进行消息投递。
    	  vhost:--------------------------------------------虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
    	  producer:-----------------------------------------消息生产者,就是投递消息的程序。
    	  consumer:-----------------------------------------消息消费者,就是接受消息的程序。
    	  channel:------------------------------------------消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
    	MQ:全称为Message Queue-------------------------------------------消息队列(MQ)是一种应用程序的通信方法。
    		1:应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,无需专用链接来链接他们。
    		2:消息传递指的是:程序之间通过在--消息--中发送数据进行通信,而不是直接调用彼此来通信,直接调用通常是用于--:诸如诸如远程过程调用的技术。
    		3:排队指的是:应用程序通过队列来通信。
    		4:队列的使用除去了接收和发送同时执行放入要求。
    	安装:
    		安装配置epel源:------------------------------------------$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
    		安装erlang:----------------------------------------------$ yum -y install erlang
    		安装RabbitMQ:--------------------------------------------$ yum -y install rabbitmq-server
    	注意:service rabbitmq-server start/stop
    	安装API:
    			pip install pika//easy_install pika
    			https://pypi.python.org/pypi/pika---------------------源码
    	基于Queue实现生产者消费者模型:
    		#!/usr/bin/env python
    		# -*- coding:utf-8 -*-
    		import Queue
    		import threading
    
    
    		message = Queue.Queue(10)
    
    
    		def producer(i):
    			while True:
    				message.put(i)
    
    
    		def consumer(i):
    			while True:
    				msg = message.get()
    
    
    		for i in range(12):
    			t = threading.Thread(target=producer, args=(i,))
    			t.start()
    
    		for i in range(10):
    			t = threading.Thread(target=consumer, args=(i,))
    			t.start()
    	对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
    		!/usr/bin/env python
    		import pika
    		# ######################### 生产者 #########################
    		 
    		connection = pika.BlockingConnection(pika.ConnectionParameters(
    				host='localhost'))
    		channel = connection.channel()
    		 
    		channel.queue_declare(queue='hello')
    		 
    		channel.basic_publish(exchange='',
    							  routing_key='hello',
    							  body='Hello World!')
    		print(" [x] Sent 'Hello World!'")
    		connection.close()
    
    		#!/usr/bin/env python
    		import pika
    		 
    		# ########################## 消费者 ##########################
    		 
    		connection = pika.BlockingConnection(pika.ConnectionParameters(
    				host='localhost'))
    		channel = connection.channel()
    		 
    		channel.queue_declare(queue='hello')
    		 
    		def callback(ch, method, properties, body):
    			print(" [x] Received %r" % body)
    		 
    		channel.basic_consume(callback,
    							  queue='hello',
    							  no_ack=True)
    		 
    		print(' [*] Waiting for messages. To exit press CTRL+C')
    		channel.start_consuming()
    	1:acknowledgment 消息不丢失
    		no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
    		消费者:
    			import pika
    			connection = pika.BlockingConnection(pika.ConnectionParameters(
    					host='10.211.55.4'))
    			channel = connection.channel()
    
    			channel.queue_declare(queue='hello')
    
    			def callback(ch, method, properties, body):
    				print(" [x] Received %r" % body)
    				import time
    				time.sleep(10)
    				print 'ok'
    				ch.basic_ack(delivery_tag = method.delivery_tag)
    
    			channel.basic_consume(callback,
    								  queue='hello',
    								  no_ack=False)
    
    			print(' [*] Waiting for messages. To exit press CTRL+C')
    			channel.start_consuming()
    	2:durable   消息不丢失
    		生产者:
    			#!/usr/bin/env python
    			import pika
    			connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    			channel = connection.channel()
    
    			# make message persistent
    			channel.queue_declare(queue='hello', durable=True)
    
    			channel.basic_publish(exchange='',
    								  routing_key='hello',
    								  body='Hello World!',
    								  properties=pika.BasicProperties(
    									  delivery_mode=2, # make message persistent
    								  ))
    			print(" [x] Sent 'Hello World!'")
    			connection.close()
    		消费者:
    			#!/usr/bin/env python
    			# -*- coding:utf-8 -*-
    			import pika
    
    			connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    			channel = connection.channel()
    
    			# make message persistent
    			channel.queue_declare(queue='hello', durable=True)
    
    
    			def callback(ch, method, properties, body):
    				print(" [x] Received %r" % body)
    				import time
    				time.sleep(10)
    				print 'ok'
    				ch.basic_ack(delivery_tag = method.delivery_tag)
    
    			channel.basic_consume(callback,
    								  queue='hello',
    								  no_ack=False)
    
    			print(' [*] Waiting for messages. To exit press CTRL+C')
    			channel.start_consuming()
    	3:消息获取顺序默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
    		channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列	
    		#!/usr/bin/env python
    		# -*- coding:utf-8 -*-
    		import pika
    
    		connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    		channel = connection.channel()
    
    		# make message persistent
    		channel.queue_declare(queue='hello')
    
    
    		def callback(ch, method, properties, body):
    			print(" [x] Received %r" % body)
    			import time
    			time.sleep(10)
    			print 'ok'
    			ch.basic_ack(delivery_tag = method.delivery_tag)
    
    		channel.basic_qos(prefetch_count=1)
    
    		channel.basic_consume(callback,
    							  queue='hello',
    							  no_ack=False)
    
    		print(' [*] Waiting for messages. To exit press CTRL+C')
    		channel.start_consuming()
    	4:发布订阅
    		发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,
    		会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
    		exchange type = fanout
    		发布者:
    			#!/usr/bin/env python
    			import pika
    			import sys
    
    			connection = pika.BlockingConnection(pika.ConnectionParameters(
    					host='localhost'))
    			channel = connection.channel()
    
    			channel.exchange_declare(exchange='logs',
    									 type='fanout')
    
    			message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    			channel.basic_publish(exchange='logs',
    								  routing_key='',
    								  body=message)
    			print(" [x] Sent %r" % message)
    			connection.close()
    		订阅者:	
    			#!/usr/bin/env python
    			import pika
    
    			connection = pika.BlockingConnection(pika.ConnectionParameters(
    					host='localhost'))
    			channel = connection.channel()
    
    			channel.exchange_declare(exchange='logs',
    									 type='fanout')
    
    			result = channel.queue_declare(exclusive=True)
    			queue_name = result.method.queue
    
    			channel.queue_bind(exchange='logs',
    							   queue=queue_name)
    
    			print(' [*] Waiting for logs. To exit press CTRL+C')
    
    			def callback(ch, method, properties, body):
    				print(" [x] %r" % body)
    
    			channel.basic_consume(callback,
    								  queue=queue_name,
    								  no_ack=True)
    
    			channel.start_consuming()
    	5:关键字发送	
    		exchange type = direct
    		之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,
    		发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
    		消费者:
    			#!/usr/bin/env python
    			import pika
    			import sys
    
    			connection = pika.BlockingConnection(pika.ConnectionParameters(
    					host='localhost'))
    			channel = connection.channel()
    
    			channel.exchange_declare(exchange='direct_logs',
    									 type='direct')
    
    			result = channel.queue_declare(exclusive=True)
    			queue_name = result.method.queue
    
    			severities = sys.argv[1:]
    			if not severities:
    				sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    				sys.exit(1)
    
    			for severity in severities:
    				channel.queue_bind(exchange='direct_logs',
    								   queue=queue_name,
    								   routing_key=severity)
    
    			print(' [*] Waiting for logs. To exit press CTRL+C')
    
    			def callback(ch, method, properties, body):
    				print(" [x] %r:%r" % (method.routing_key, body))
    
    			channel.basic_consume(callback,
    								  queue=queue_name,
    								  no_ack=True)
    
    			channel.start_consuming()
    		生产者:
    			#!/usr/bin/env python
    			import pika
    			import sys
    
    			connection = pika.BlockingConnection(pika.ConnectionParameters(
    					host='localhost'))
    			channel = connection.channel()
    
    			channel.exchange_declare(exchange='direct_logs',
    									 type='direct')
    
    			severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    			message = ' '.join(sys.argv[2:]) or 'Hello World!'
    			channel.basic_publish(exchange='direct_logs',
    								  routing_key=severity,
    								  body=message)
    			print(" [x] Sent %r:%r" % (severity, message))
    			connection.close()
    	6:模糊匹配
    		exchange type = topic
    		在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,
    		匹配成功,则将数据发送到指定队列。
    		#  表示可以匹配 0 个 或 多个 单词
    		*  表示只能匹配 一个 单词
    		发送者路由值              队列中
    		old.boy.python          old.*  -- 不匹配
    		old.boy.python          old.#  -- 匹配
    		消费者:	
    			#!/usr/bin/env python
    			import pika
    			import sys
    
    			connection = pika.BlockingConnection(pika.ConnectionParameters(
    					host='localhost'))
    			channel = connection.channel()
    
    			channel.exchange_declare(exchange='topic_logs',
    									 type='topic')
    
    			result = channel.queue_declare(exclusive=True)
    			queue_name = result.method.queue
    
    			binding_keys = sys.argv[1:]
    			if not binding_keys:
    				sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
    				sys.exit(1)
    
    			for binding_key in binding_keys:
    				channel.queue_bind(exchange='topic_logs',
    								   queue=queue_name,
    								   routing_key=binding_key)
    
    			print(' [*] Waiting for logs. To exit press CTRL+C')
    
    			def callback(ch, method, properties, body):
    				print(" [x] %r:%r" % (method.routing_key, body))
    
    			channel.basic_consume(callback,
    								  queue=queue_name,
    								  no_ack=True)
    
    			channel.start_consuming()
    		生产者:
    			#!/usr/bin/env python
    			import pika
    			import sys
    
    			connection = pika.BlockingConnection(pika.ConnectionParameters(
    					host='localhost'))
    			channel = connection.channel()
    
    			channel.exchange_declare(exchange='topic_logs',
    									 type='topic')
    
    			routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    			message = ' '.join(sys.argv[2:]) or 'Hello World!'
    			channel.basic_publish(exchange='topic_logs',
    								  routing_key=routing_key,
    								  body=message)
    			print(" [x] Sent %r:%r" % (routing_key, message))
    			connection.close()
    	注意:
    		sudo rabbitmqctl add_user alex 123
    		# 设置用户为administrator角色
    		sudo rabbitmqctl set_user_tags alex administrator
    		# 设置权限
    		sudo rabbitmqctl set_permissions -p "/" alex '.''.''.'
    		# 然后重启rabbiMQ服务
    		sudo /etc/init.d/rabbitmq-server restart
    		# 然后可以使用刚才的用户远程连接rabbitmq server了。
    		------------------------------
    		credentials = pika.PlainCredentials("alex","123")
    
    		connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.14.47',credentials=credentials))
    	RabbitMQ源码安装-------------------------------------------------------------------------------
    		
    		官网地址:rabbitmq
    		http://www.rabbitmq.com/releases/rabbitmq-server/
    		官网地址:erlang
    		http://erlang.org/download/
    		一、概念:
    			 Broker:简单来说就是消息队列服务器实体。
    		   Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    		   Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
    		   Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
    		   Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    		   vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
    		   producer:消息生产者,就是投递消息的程序。
    		   consumer:消息消费者,就是接受消息的程序。
    		   channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
    
    		二、安装RabbitMQ
    			 CentOS:
    				  1.先安装erlang
    					   # PS: 注意安装操作首先要切换到root工作环节中
    					   # 在命令 如果$ 表示是普通用户,
    					   yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel 
    					   yum -y install ncurses-devel 
    					   yum install ncurses-devel 
    					   wget http://erlang.org/download/otp_src_19.3.tar.gz
    					   tar -xzvf otp_src_19.3.tar.gz
    					   cd otp_src_19.3
    					   ./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe 
    					   make && make install
    					   配置erlang环境
    					   vi /etc/profile  #在最后添加下文
    					   PATH=$PATH:/usr/local/erlang/bin
    					   使环境变量生效
    					   source /etc/profile
    					   测试一下是否安装成功,在控制台输入命令erl
    					   crt+z 退出
    				  2.安装rabbitmq
    			
    					   wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.8/rabbitmq-server-3.5.8.tar.gz
    					   tar -zxvf rabbitmq-server-3.5.8.tar.gz
    					   cd abbitmq-server-3.5.8
    					   make
    					   make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc install
    					   配置erlang环境
    					   vi /etc/profile  #在最后添加下文
    					   PATH=$PATH:/usr/local/erlang/bin:/usr/local/rabbitmq/sbin
    					   使环境变量生效
    					   source /etc/profile
    				  3. 启动:rabbitmq-server
    						rabbitmq-server start
    				  4. 检查服务启动状态
    						[root@node01 ~]# netstat -lnpt|grep beam
    						tcp        0      0 0.0.0.0:5672                0.0.0.0:*                   LISTEN      19733/beam          
    						tcp        0      0 0.0.0.0:25672               0.0.0.0:*                   LISTEN      19733/beam          
    						tcp        0      0 0.0.0.0:15672               0.0.0.0:*                   LISTEN      19733/beam          
    						[root@node01 ~]# 
    						#  表示启动成功
    		三、管理命令
    			   启动:rabbitmq-server start
    			   关闭:rabbitmqctl stop
    			   状态:rabbitmqctl status
    
    		四、插件
    			启动web管理插件
    				rabbitmq-plugins enable rabbitmq_management
    				如果报错
    				错误解决:
    					Error: {cannot_write_enabled_plugins_file,"/etc/rabbitmq/enabled_plugins",            enoent}
    					mkdir /etc/rabbitmq
    					重新启动输入地址:localhost:15672,帐号默认为guest,密码guest,此帐号默认只能在本机访问。不建议打开远程访问。你可以创建一个帐户,并设置可以远程访问的角色进行访问。
    					如:
    					rabbitmqctl add_user supery 123				# 创建用户supery
    					
    					rabbitmqctl  set_user_tags  supery administrator		# 给创建的supery用户administrator角色
    
    		五、用户管理
    			 默认的guest帐户相当于root帐户
    			 rabbitmqctl add_user username password 添加帐户
    			 rabbitmqctl change_password username newpassword 修改密码
    			 rabbitmqctl delete_user username 删除帐户
    			 rabbitmqctl list_users 列出所有帐户
    			 rabbitmqctl  set_user_tags  User  Tag 设置角色(administrator、monitoring、policymaker、management、其它)
    			 立即生效,不需重启
    
    		六、创建配置文件
    		[root@node01 ~]# ll /etc/rabbitmq/
    		total 8
    		-rw-r--r-- 1 root root 23 Mar  5 10:07 enabled_plugins
    		-rw-r--r-- 1 root root 51 Mar  5 10:12 rabbitmq.config
    		[root@node01 ~]# cat /etc/rabbitmq/rabbitmq.config 
    		[{rabbit, [{loopback_users, ["root","supery"]}]}].
    
    		操作步骤:
    			vi /etc/rabbitmq/rabbitmq.config
    			将[{rabbit, [{loopback_users, ["root","supery"]}]}]. 复制到文件中即可
    			
    			
    			
    			esc
    			:x  保存并退出
    			
    		七、重启rabbitmq-server并测试访问
    		rabbitmqctl stop
    		rabbitmqctl start
    		浏览器访问
    			1. 查看自己的服务器ip地址
    			ifconfig
    			# inet addr:172.24.129.3		===> 服务器ip地址
    			2. 浏览器访问
    			http://39.104.109.159:15672  	===> 输入用户密码登录即可
    			
    		登录成功!完成
    	 
    			   
    

      

  • 相关阅读:
    OA权限管理的实现(下)
    Eclipse及其插件介绍和下载(转)
    [转载]在Redhat Linux AS 4下实现软件RAID
    RAID磁盘阵列术语详解(转)
    [转载]关于"编译器失败,错误代码为128。"的解决方案
    Android的SurfaceView使用
    剖析 SurfaceView Callback以及SurfaceHolder
    android drawable 应用
    Android layout xml总结
    listView控件,adapter,以及其他控件的组合
  • 原文地址:https://www.cnblogs.com/w-s-l123/p/8525724.html
Copyright © 2011-2022 走看看