zoukankan      html  css  js  c++  java
  • RabbitMQ 从入门到精通 (一)

    初识RabbitMQ

    RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用 Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的

    RabbitMQ的优点:

    • 开源、性能优秀、稳定性保障
    • 提供可靠性消息投递模式(confirm)、返回模式(return)
    • 与SpringAMQP完美的整合、API丰富
    • 集群模式丰富,表达式配置,HA模式,镜像队列模型
    • 保证数据不丢失的前提下做到高可靠性、可用性

    RabbitMQ官网

    RabbitMQ的整体架构:

     
    RabbitMQ的消息流转:

     

     

    AMQP

    AMQP全称: Advanced Message Queuing Protocol

    AMQP翻译: 高级消息队列协议

    AMQP定义:是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

     
     

    AMQP核心概念:

    • Server:又称Broker,接受客户端的连接,实现AMQP实体服务
    • Connection:连接,应用程序与Broker的网络连接
    • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
    • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体的内容
    • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。同一个Virtual Host里面不能有相同名称的Exchange或Queue
    • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
    • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
    • Routing key:一个路由规则,虚拟机可用它确定如何路由一个特定消息
    • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

     

     

    RabbitMQ的极速入门

    后台启动: ./rabbitmq start &

    关闭: ./rabbitmqctl stop

    节点状态: ./rabbitmqctl status

    管控台: http://ip:15672

     

     

    RabbitMQ生产消费快速入门:

    环境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依赖配置)

     <parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.9.RELEASE</version>
      </parent>
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		
    		<dependency>
    			<groupId>com.rabbitmq</groupId>
    			<artifactId>amqp-client</artifactId>
    			<version>3.6.5</version>
    		</dependency>
    	</dependencies>
    

     

    public class Procuder {
    	public static void main(String[] args) throws Exception {
    		
    		//1.创建一个ConnectionFactory 并进行配置
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.244.11");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		connectionFactory.setHandshakeTimeout(20000);
    		//2.通过连接工厂创建连接
    		Connection connection = connectionFactory.newConnection();
    		
    		//3.通过Connection 创建一个 Channel
    		Channel channel = connection.createChannel();
    	
    		/**
    		 * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
    		 * exchange:指定交换机 不指定 则默认 (AMQP default交换机) 通过routingkey进行匹配 
    		 * props 消息属性
    		 * body 消息体
    		 */
    		//4.通过Channel发送数据
    		for(int i = 0; i < 5; i++){
    		  System.out.println("生产消息:" + i);
    		  String msg = "Hello RabbitMQ" + i;
    	      channel.basicPublish("", "test", null, msg.getBytes());
    		}
    		
    		
    		//5.记得关闭相关的连接
    		channel.close();
    		connection.close();
    	}
    }
    

     

    public class Consumer {
    	public static void main(String[] args) throws Exception{
    				//1.创建一个ConnectionFactory 并进行配置
    				ConnectionFactory connectionFactory = new ConnectionFactory();
    				connectionFactory.setHost("192.168.244.11");
    				connectionFactory.setPort(5672);
    				connectionFactory.setVirtualHost("/");
    				connectionFactory.setHandshakeTimeout(20000);
    				//2.通过连接工厂创建连接
    				Connection connection = connectionFactory.newConnection();
    				
    				//3.通过Connection 创建一个 Channel
    				Channel channel = connection.createChannel();
    				
    				//4. 声明创建一个队列
    				String queueName = "test";
    				/**
    				 * durable 是否持久化
    				 * exclusive 独占的  相当于加了一把锁
    				 */
    				channel.queueDeclare(queueName,true,false,false,null);
    				
    				//5.创建消费者
    				QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    				
    				//6.设置channel
    				/**
    				 * ACK: 当一条消息从生产端发到消费端,消费端接收到消息后会马上回送一个ACK信息给broker,告诉它这条消息收到了
    				 * autoack: 
    				 * true  自动签收 当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
    				 * false 手动签收 当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了
    				 * 
    				 */
    				channel.basicConsume(queueName, true, queueingConsumer);
    				
    				//7.获取消息
    				while(true){
    					Delivery delivery = queueingConsumer.nextDelivery();
    					String msg = new String(delivery.getBody());
    					System.err.println("消费端:" + msg);
    					//Envelope envelope = delivery.getEnvelope();
    				}
    	}
    }
    

     

    Exchange(交换机)详解

    Exchange: 接收消息,并根据路由键转发消息所绑定的队列

     

    交换机属性:

    • Name: 交换机名称
    • Type: 交换机类型 diect、topic、fanout、headers
    • Durability:是否需要持久化,true为持久化
    • AutoDelete: 当最后一个绑定到Exchange的队列删除后,自动删除该Exchange
    • Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为false (百分之99的情况默认为false 除非对Erlang语言较了解,做一些扩展)
    • Arguments:扩展参数, 用于扩展AMQP协议可自定化使用

     

    Direct Exchange

    所有发送到Direct Exchange的消息被转发到RouteKey指定的Queue

    注意:Direct模式可以使用RabbitMQ自带的Exchange: default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RoutingKey必须完全匹配才会被队列接收,否则该消息会被抛弃

     

    public class ProducerDirectExchange {
    	public static void main(String[] args) throws Exception {
    		//1.创建ConnectionFactory
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.244.11");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		
    		//2.创建Connection
    		Connection connection = connectionFactory.newConnection();
    		//3.创建Channel
    		Channel channel = connection.createChannel();
    		//4.声明
    		String exchangeName = "test_direct_exchange";
    		String routingKey = "test.direct";
    		//5.发送
    		String msg = "Hello World RabbitMQ4 Direct Exchange Message";
    		channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    	}
    }
    

     

    public class ConsumerDirectExchange {
    	public static void main(String[] args) throws Exception{
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.244.11");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		connectionFactory.setHandshakeTimeout(20000);
    		connectionFactory.setAutomaticRecoveryEnabled(true);
    		connectionFactory.setNetworkRecoveryInterval(3000);
    		
    		Connection connection = connectionFactory.newConnection();
    		
    		Channel channel = connection.createChannel();
    		//声明
    		String exchangeName = "test_direct_exchange";
    		String exchangeType = "direct";
    		String queueName = "test_direct_queue";
    		String routingKey = "test.direct";
    		//表示声明了一个交换机
    		channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
    		//表示声明了一个队列
    		channel.queueDeclare(queueName,false,false,false,null);
    		//建立一个绑定关系
    		channel.queueBind(queueName, exchangeName, routingKey);
    		
    		//durable 是否持久化消息
    		QueueingConsumer consumer = new QueueingConsumer(channel);
    		//参数:队列名称,是否自动ACK,Consumer
    		channel.basicConsume(queueName, true, consumer);
    		
    		//循环获取消息
    		while(true){
    			//获取消息,如果没有消息,这一步将会一直阻塞
    			Delivery delivery = consumer.nextDelivery();
    			String msg = new String(delivery.getBody());
    			System.out.println("收到消息:" + msg);
    		}
    	}
    }
    

     

    Topic Exchange

    所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上

    Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

    注意:可以使用通配符进行匹配

    符号 # 匹配一个或多个词

    符号 * 匹配不多不少一个词

    例如: "log.#" 能够匹配到 “log.info.oa”

    ​"log.*" 只会匹配到 "log.err"

    public class ProducerTopicExchange {
    	public static void main(String[] args) throws Exception {
    		//1.创建ConnectionFactory
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.244.11");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		connectionFactory.setHandshakeTimeout(20000);
    
    		//2.创建Connection
    		Connection connection = connectionFactory.newConnection();
    		//3.创建Channel
    		Channel channel = connection.createChannel();
    		//4.声明
    		String exchangeName = "test_topic_exchange";
    		String routingKey1 = "user.save";
    		String routingKey2 = "user.update";
    		String routingKey3 = "user.delete.abc";
    		//5.发送
    		String msg = "Hello World RabbitMQ4 Direct Exchange Message";
    		channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
    		channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
    		channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
    	}
    }
    

     

    public class ConsumerTopicExchange {
    	public static void main(String[] args) throws Exception{
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.244.11");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		connectionFactory.setHandshakeTimeout(20000);
    		connectionFactory.setAutomaticRecoveryEnabled(true);
    		connectionFactory.setNetworkRecoveryInterval(3000);
    		
    		Connection connection = connectionFactory.newConnection();
    		
    		Channel channel = connection.createChannel();
    		//声明
    		String exchangeName = "test_topic_exchange";
    		String exchangeType = "topic";
    		String queueName = "test_topic_queue";
    		String routingKey = "user.#";
    		//表示声明了一个交换机
    		channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
    		//表示声明了一个队列
    		channel.queueDeclare(queueName,false,false,false,null);
    		//建立一个绑定关系
    		channel.queueBind(queueName, exchangeName, routingKey);
    		
    		//durable 是否持久化消息
    		QueueingConsumer consumer = new QueueingConsumer(channel);
    		//参数:队列名称,是否自动ACK,Consumer
    		channel.basicConsume(queueName, true, consumer);
    		
    		//循环获取消息
    		while(true){
    			//获取消息,如果没有消息,这一步将会一直阻塞
    			Delivery delivery = consumer.nextDelivery();
    			String msg = new String(delivery.getBody());
    			System.out.println("收到消息:" + msg);
    		}
    	}
    }
    

     

    Fanout Exchange

    不处理路由键,只需要简单的将队列绑定到交换机上,发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
    所以Fanout交换机转发消息是最快的

     

    public class ProducerFanoutExchange {
    	public static void main(String[] args) throws Exception {
    		//1.创建ConnectionFactory
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.244.11");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		connectionFactory.setHandshakeTimeout(20000);
    
    		//2.创建Connection
    		Connection connection = connectionFactory.newConnection();
    		//3.创建Channel
    		Channel channel = connection.createChannel();
    		//4.声明
    		String exchangeName = "test_fanout_exchange";
    		//5.发送
    		for(int i = 0; i < 10 ; i++){
    			String msg = "Hello World RabbitMQ4 Direct Exchange Message";
    			channel.basicPublish(exchangeName, "", null, msg.getBytes());
    		}
    		channel.close();
    		connection.close();
    	}
    }
    

     

    public class ConsumerFanoutExchange {
    	public static void main(String[] args) throws Exception{
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.244.11");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		connectionFactory.setHandshakeTimeout(20000);
    		connectionFactory.setAutomaticRecoveryEnabled(true);
    		connectionFactory.setNetworkRecoveryInterval(3000);
    		
    		Connection connection = connectionFactory.newConnection();
    		
    		Channel channel = connection.createChannel();
    		//声明
    		String exchangeName = "test_fanout_exchange";
    		String exchangeType = "fanout";
    		String queueName = "test_topic_queue";
    		//无需指定路由key 
    		String routingKey = "";
    		//表示声明了一个交换机
    		channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
    		//表示声明了一个队列
    		channel.queueDeclare(queueName,false,false,false,null);
    		//建立一个绑定关系
    		channel.queueBind(queueName, exchangeName, routingKey);
    		
    		//durable 是否持久化消息
    		QueueingConsumer consumer = new QueueingConsumer(channel);
    		//参数:队列名称,是否自动ACK,Consumer
    		channel.basicConsume(queueName, true, consumer);
    		
    		//循环获取消息
    		while(true){
    			//获取消息,如果没有消息,这一步将会一直阻塞
    			Delivery delivery = consumer.nextDelivery();
    			String msg = new String(delivery.getBody());
    			System.out.println("收到消息:" + msg);
    		}
    	}
    }
    

     

    Message 消息

    服务器与应用程序之间传递的数据,本质上就是一段数据,由Properties和Body组成

    常用属性:delivery mode、headers (自定义属性)

    其他属性:content_type、content_encoding、priority、expiration

    消息的properties属性用法示例:

    public class Procuder {
    	public static void main(String[] args) throws Exception {
    		
    		//1.创建一个ConnectionFactory 并进行配置
    		ConnectionFactory connectionFactory = new ConnectionFactory();
    		connectionFactory.setHost("192.168.244.11");
    		connectionFactory.setPort(5672);
    		connectionFactory.setVirtualHost("/");
    		connectionFactory.setHandshakeTimeout(20000);
    		//2.通过连接工厂创建连接
    		Connection connection = connectionFactory.newConnection();
    		
    		//3.通过Connection 创建一个 Channel
    		Channel channel = connection.createChannel();
    	
    		Map<String,Object> headers = new HashMap<>();
    		headers.put("my1", "111");
    		headers.put("my2", "222");
    		
    		//10秒不消费 消息过期移除消息队列
    		AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    				.deliveryMode(2)
    				.contentEncoding("utf-8")
    				.expiration("10000")
    				.headers(headers)
    				.build();
    		
    		//4.通过Channel发送数据
    		for(int i = 0; i < 5; i++){
    		  System.out.println("生产消息:" + i);
    		  String msg = "Hello RabbitMQ" + i;
    	      channel.basicPublish("", "test", properties, msg.getBytes());
    		}
    		
    		
    		//5.记得关闭相关的连接
    		channel.close();
    		connection.close();
    	}
    }
    

     

    public class Consumer {
    	public static void main(String[] args) throws Exception{
    				//1.创建一个ConnectionFactory 并进行配置
    				ConnectionFactory connectionFactory = new ConnectionFactory();
    				connectionFactory.setHost("192.168.244.11");
    				connectionFactory.setPort(5672);
    				connectionFactory.setVirtualHost("/");
    				connectionFactory.setHandshakeTimeout(20000);
    				//2.通过连接工厂创建连接
    				Connection connection = connectionFactory.newConnection();
    				
    				//3.通过Connection 创建一个 Channel
    				Channel channel = connection.createChannel();
    				
    				//4. 声明创建一个队列
    				String queueName = "test";
    				channel.queueDeclare(queueName,true,false,false,null);
    				
    				//5.创建消费者
    				QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    				
    				//6.设置channel
    				channel.basicConsume(queueName, true, queueingConsumer);
    				
    				//7.获取消息
    				while(true){
    					Delivery delivery = queueingConsumer.nextDelivery();
    					String msg = new String(delivery.getBody());
    					System.err.println("消费端:" + msg);
    					
    					Map<String, Object> headers = delivery.getProperties().getHeaders();
    					System.err.println("headers value:" + headers.get("my1"));
    				}
    	}
    }
    
  • 相关阅读:
    异步任务----django-celery
    signal函数
    shell脚本字符显示颜色
    echo输出到文件
    windows下opencv安装
    模板
    下载vs地址
    关联容器 map
    构造函数初始化列表
    assert() fflush()
  • 原文地址:https://www.cnblogs.com/dwlovelife/p/10982735.html
Copyright © 2011-2022 走看看