MQ连接工厂还是之前的那个Connection
1 package com.mmr.rabbitmq.util; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.ConnectionFactory; 7 8 public class ConnectionUtils { 9 /** 10 * @desc 获取Mq 的链接 11 * @author zp 12 * @throws IOException 13 * @date 2018-7-19 14 */ 15 public static Connection getConnection() throws IOException { 16 // 1.定义一个链接工厂 17 ConnectionFactory factroy = new ConnectionFactory(); 18 19 // 2.设置服务地址 20 factroy.setHost("127.0.0.1"); 21 22 // 3.设置端口号 23 factroy.setPort(5672); 24 25 // 4.vhost 设置数据库 26 factroy.setVirtualHost("vhtest"); 27 28 // 5.设置用户名 29 factroy.setUsername("jerry"); 30 31 // 6. 设置密码 32 factroy.setPassword("123456"); 33 34 // 7.返回链接 35 return factroy.newConnection(); 36 } 37 }
消息生产者类的定义Send
1 package com.mmr.rabbitmq.work; 2 3 import java.io.IOException; 4 5 import com.mmr.rabbitmq.util.ConnectionUtils; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 9 public class Send { 10 11 /* 12 * |--C1 13 * P-------|--C2 14 * |--C3 15 * 16 * */ 17 private static final String QUEUE_NAME="test_work_queue"; 18 public static void main(String[] args) throws IOException, InterruptedException{ 19 // 获取链接 20 Connection connection = ConnectionUtils.getConnection(); 21 22 // 获取通道 23 Channel channel = connection.createChannel(); 24 // 声明队列 25 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 26 27 for (int i = 0; i < 50; i++) { 28 String msg = "hello "+i; 29 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 30 System.out.println("send msg 的第"+i+"条"); 31 Thread.sleep(i*20); 32 } 33 channel.close(); 34 connection.close(); 35 } 36 }
消息消费者累的定义 Recv1 Recv2
1 package com.mmr.rabbitmq.work; 2 3 import java.io.IOException; 4 5 import com.mmr.rabbitmq.util.ConnectionUtils; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.Consumer; 9 import com.rabbitmq.client.DefaultConsumer; 10 import com.rabbitmq.client.Envelope; 11 import com.rabbitmq.client.AMQP.BasicProperties; 12 13 public class Recv1 { 14 private static final String QUEUE_NAME="test_work_queue"; 15 public static void main(String[] args) throws IOException{ 16 // 获取链接 17 Connection connection = ConnectionUtils.getConnection(); 18 19 //获取频道 20 21 Channel channel = connection.createChannel(); 22 23 // 声明队列 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 26 // 定义一个消费者 27 Consumer consumer = new DefaultConsumer(channel){ 28 // 一旦有消息 就会触发这个方法 消息到达 29 @Override 30 public void handleDelivery(String consumerTag, Envelope envelope, 31 BasicProperties properties, byte[] body) throws IOException { 32 // TODO Auto-generated method stub 33 // 拿消息 34 String msg = new String(body,"utf-8"); 35 36 //搭出来 37 System.out.println("[1]Recv msg:"+msg); 38 try { 39 Thread.sleep(2000); 40 } catch (Exception e) { 41 // TODO: handle exception 42 e.printStackTrace(); 43 }finally{ 44 System.out.println("[1] done"); 45 } 46 } 47 }; 48 boolean autoAck = true; 49 channel.basicConsume(QUEUE_NAME, autoAck,consumer); 50 51 } 52 }
1 package com.mmr.rabbitmq.work; 2 3 import java.io.IOException; 4 5 import com.mmr.rabbitmq.util.ConnectionUtils; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.Consumer; 9 import com.rabbitmq.client.DefaultConsumer; 10 import com.rabbitmq.client.Envelope; 11 import com.rabbitmq.client.AMQP.BasicProperties; 12 13 public class Recv2 { 14 private static final String QUEUE_NAME="test_work_queue"; 15 public static void main(String[] args) throws IOException{ 16 // 获取链接 17 Connection connection = ConnectionUtils.getConnection(); 18 19 //获取频道 20 21 Channel channel = connection.createChannel(); 22 23 // 声明队列 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 26 // 定义一个消费者 27 Consumer consumer = new DefaultConsumer(channel){ 28 // 一旦有消息 就会触发这个方法 消息到达 29 @Override 30 public void handleDelivery(String consumerTag, Envelope envelope, 31 BasicProperties properties, byte[] body) throws IOException { 32 // TODO Auto-generated method stub 33 // 拿消息 34 String msg = new String(body,"utf-8"); 35 36 //搭出来 37 System.out.println("[2]Recv msg:"+msg); 38 try { 39 Thread.sleep(1000); 40 } catch (Exception e) { 41 // TODO: handle exception 42 e.printStackTrace(); 43 }finally{ 44 System.out.println("[2] done"); 45 } 46 } 47 }; 48 boolean autoAck = true; 49 channel.basicConsume(QUEUE_NAME, autoAck,consumer); 50 51 } 52 }
1.首先我们运行Recv1 Recv2 对消息进行监听
2.其次我们运行Send,开始生产消息。
3.最后得到的结果是:消费者1(都是偶数)和消费者2(都是奇数)处理消息是一样的
为什么会出现这种现象呢?
----这种方式叫做轮训分发(round-robin)结果就是不管谁忙谁闲,都不会多给一个消息,任务就是你一个我一个。