消息队列RabbitMQ
消息队列是程序之间的通信方法,无需即时返回而且耗时的操作进行异步处理,从而提高系统的吞吐量,可以实现程序之间的解耦。
-
安装软件:注意RabbitMQ和erlang的版本需要对应,要以管理员的身份运行。
-
创建工程:添加amqp-client客户端依赖。
-
生产者发送消息到RabbitMQ队列,消费者从消息对列中获取消息,步骤如下:
-
将创建连接工厂的方法抽取为一个工具类
public class ConnectionUntil { public static Connection getConnection() throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置主机 factory.setHost("localhost"); //设置端口号 factory.setPort(5672); //设置虚拟主机 factory.setVirtualHost("/simple_test"); //设置用户名 factory.setUsername("admin"); //设置密码 factory.setPassword("123456"); return factory.newConnection(); } }
-
simple简单模式
- 生产者
public class producter { public static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception{ /*** * @Description: 创建连接工厂 */ Connection connection = ConnectionUntil.getConnection(); /*** * @Description: 创建频道 */ Channel channel = connection.createChannel(); /*** * @Description: 声明队列 * 队列名称 * 是否持久化 * 是否独占连接 * 不使用自动删除 * 其他参数 */ channel.queueDeclare(QUEUE_NAME,true,false,false,null); /*** * @Description: 发送消息 * 交换机名称 * 路由key * 消息其他属性 * 消息内容 */ String message = "helle,rabbitma"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); /*** * @Description: 关闭资源 */ channel.close(); connection.close(); } }
- 消费者
public class Consoumer { public static void main(String[] args) throws Exception { //创建连接 Connection connection = ConnectionUntil.getConnection(); //创建通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(producter.QUEUE_NAME,true,false,false,null); //创建消费者 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(envelope.getRoutingKey()+"=========="); System.out.println(envelope.getDeliveryTag()+"=========="); System.out.println(envelope.getExchange()+"=========="); System.out.println(new String(body,"utf-8")+"=========="); } }; //监听队列 channel.basicConsume(producter.QUEUE_NAME,true,consumer); } }
-
工作模式
如在同一队列中有两个或两个以上消费者的时候,他们处于竞争关系。
- 生产者
public class Producter { //声明队列 public static final String WORKE_QUEUE = "worke_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUntil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(WORKE_QUEUE,true,false,false,null); for (int i = 0;i<10;i++){ String message = "发送消息了"+i; channel.basicPublish("",WORKE_QUEUE,null,message.getBytes()); System.out.println(message); } channel.close(); } }
- 消费者1
public class Consoumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUntil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(Producter.WORKE_QUEUE,true,false,false,null); //每次可以预取多少消息 channel.basicQos(1); DefaultConsumer consoumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(envelope.getDeliveryTag()); System.out.println(new String(body,"utf-8")); //false表示只有当前消息被处理 channel.basicAck(envelope.getDeliveryTag(),true); } }; channel.basicConsume(Producter.WORKE_QUEUE,true,consoumer); } }
- 消费者2
public class Consoumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUntil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(Producter.WORKE_QUEUE,true,false,false,null); System.out.println("kaishi "); DefaultConsumer consoumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(envelope.getDeliveryTag()); System.out.println(new String(body,"utf-8")); } }; System.out.println("监听"); channel.basicConsume(Producter.WORKE_QUEUE,true,consoumer); } }
消息交换机Exchange常见的有3种类型,而且交换机本身不存储数据,只负责发送,发送到通道里的数据对于消费者来说依旧属于竞争关系:
1. Fanout:广播,将消息交给所有绑定到交换机的队列。
2. Direct:定向,将消息交给指定 routing key的队列。
3. Topic:通配符,把消息交给符合 routing pattern的队列。
1)*
匹配一个单词
2)#
匹配多个单词 -
发布与订阅模式:Fanout
- 生产者
public class FanoutProducter { //声明交换机 public static String FANOUT_EXCHANGE = "fanout_exchange"; //声明队列 public static String FANOUT_QUEUE_1 = "fanout_queue_1"; public static String FANOUT_QUEUE_2 = "fanout_queue_2"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUntil.getConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); //绑定队列 channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null); channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null); //队列绑定到交换机 channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,""); channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,""); for (int i = 0; i<10;i++){ String message = "fanout"+i; channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes()); } channel.close(); } }
- 消费者1
public class FanoutConsoumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUntil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(FanoutProducter.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT); channel.queueDeclare(FanoutProducter.FANOUT_QUEUE_1,true,false,false,null); channel.queueBind(FanoutProducter.FANOUT_QUEUE_1,FanoutProducter.FANOUT_EXCHANGE,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(envelope.getDeliveryTag()); System.out.println(envelope.getExchange()); System.out.println(new String(body,"utf-8")); } }; channel.basicConsume(FanoutProducter.FANOUT_QUEUE_1,true,consumer); } }
- 消费者2
public class FanoutConsoumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUntil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(FanoutProducter.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT); channel.queueDeclare(FanoutProducter.FANOUT_QUEUE_2,true,false,false,null); channel.queueBind(FanoutProducter.FANOUT_QUEUE_2,FanoutProducter.FANOUT_EXCHANGE,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(envelope.getDeliveryTag()); System.out.println(envelope.getExchange()); System.out.println(new String(body,"utf-8")); } }; channel.basicConsume(FanoutProducter.FANOUT_QUEUE_2,true,consumer); } }
-
路由模式:Direct
携带路由key,当队列的key和消费者的key一致时才可以接收到信息
- 生产者
public class DirectPro { public static String direct_exchange="direct_exchange"; public static String direct_queue1="direct_queue_1"; public static String direct_queue2="direct_queue_2"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUntil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(direct_exchange, BuiltinExchangeType.DIRECT); channel.queueDeclare(direct_queue1,true,false,false,null); channel.queueDeclare(direct_queue2,true,false,false,null); channel.queueBind(direct_queue1,direct_exchange,"update"); channel.queueBind(direct_queue2,direct_exchange,"insert"); String message = "2222222222"; channel.basicPublish(direct_exchange,"insert",null,message.getBytes()); message = "111111111111111"; channel.basicPublish(direct_exchange,"update",null,message.getBytes()); } }
- 消费者1
public class DirectCon1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUntil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(DirectPro.direct_exchange, BuiltinExchangeType.DIRECT); channel.queueDeclare(DirectPro.direct_queue1,true,false,false,null); channel.queueBind(DirectPro.direct_queue1,DirectPro.direct_exchange,"insert"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(consumerTag); System.out.println(envelope.getExchange()); System.out.println(envelope.getRoutingKey()); System.out.println(envelope.getDeliveryTag()); System.out.println(new String(body,"utf-8")); } }; channel.basicConsume(DirectPro.direct_queue1,true,consumer); } }
- 消费者2
public class DirectCon2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUntil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(DirectPro.direct_exchange, BuiltinExchangeType.DIRECT); channel.queueDeclare(DirectPro.direct_queue2,true,false,false,null); channel.queueBind(DirectPro.direct_queue2,DirectPro.direct_exchange,"update"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(consumerTag); System.out.println(envelope.getExchange()); System.out.println(envelope.getRoutingKey()); System.out.println(envelope.getDeliveryTag()); System.out.println(new String(body,"utf-8")); } }; channel.basicConsume(DirectPro.direct_queue2,true,consumer); } }
-
通配符模式:Topic
可以使用通配符 *、#、来匹配路由key,匹配即可收到消息。
-