zoukankan      html  css  js  c++  java
  • rabbitmq简单实例

    JMS组件:activemq(慢)
    AMQP组件(advance message queue protocol):rabbitmq和kafka

    一.、消息队列解决了什么问题?
    异步处理
    应用解耦
    流量削锋
    日志处理

    二、rabbitmq安装与配置

    三、java操作rabbitmq
    1. simple 简单队列
    2. work queues 工作队列 公平分发 轮询分发
    3. publish/subscribe 发布于订阅
    4. routing 路由选择 通配符模式
    5. topics 主题
    6. 手动和自动确认消息
    7. 队列的持久化和非持久化
    8. rabbitmq的延迟队列

    四、spring AMQP spring-rabbitmq

    五、场景demo mq实现搜索引擎DIH增量

    六、场景demo 未支付订单30分钟,取消

    七、大数据应用 类似百度统计 cnzz架构 消息队列

    一、简单队列

    ConnectionUtils.java

     1 public class ConnectionUtils {
     2     public static Connection getConnection() throws IOException, TimeoutException {
     3         ConnectionFactory factory = new ConnectionFactory();
     4         factory.setHost("127.0.0.1");
     5         factory.setPort(5672);
     6         factory.setVirtualHost("/vhost_mmr");
     7         factory.setUsername("cxx");
     8         factory.setPassword("cxx");
     9         return factory.newConnection();
    10     }
    11 }
    View Code

    Send.java

     1 /**
     2  * 生产者发送消息
     3  */
     4 public class Send {
     5 
     6     private static final String QUEUE_NAME = "test_simple_queue";
     7 
     8     public static void main(String[] args) throws IOException, TimeoutException {
     9         Connection connection = ConnectionUtils.getConnection();
    10 
    11         Channel channel = connection.createChannel();
    12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    13 
    14         String msg = "hello simple!!!!!!!!!!!";
    15         channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    16         System.out.println("--send msg;" + msg);
    17         channel.close();
    18         connection.close();
    19     }
    20 }
    View Code

    Receive.java

     1 /**
     2  * 消费者获取消息
     3  */
     4 public class Receive {
     5 
     6     private static final String QUEUE_NAME = "test_simple_queue";
     7 
     8     public static void main(String[] args) throws IOException, TimeoutException {
     9         //获取链接
    10         Connection connection = ConnectionUtils.getConnection();
    11         //创建通道
    12         Channel channel = connection.createChannel();
    13         //队列声明
    14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    15 
    16         DefaultConsumer consumer = new DefaultConsumer(channel) {
    17             //获取到达的消息
    18             @Override
    19             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    20                 super.handleDelivery(consumerTag, envelope, properties, body);
    21                 String msg = new String(body, "utf-8");
    22                 System.out.println("new api recv:" + msg);
    23             }
    24         };
    25 
    26         //监听队列
    27         channel.basicConsume(QUEUE_NAME, true, consumer);
    28     }
    29 }
    View Code

    二、工作队列

    2.1 轮询分发

    Send.java

     1 /**
     2  * ---c1
     3  * p---Queue----
     4  * ---c2
     5  */
     6 public class Send {
     7     private static final String QUEUE_NAME = "test_work_queue";
     8 
     9     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    10         //获取链接
    11         Connection connection = ConnectionUtils.getConnection();
    12         //获取channel
    13         Channel channel = connection.createChannel();
    14         //声明队列
    15         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    16         for (int i = 0; i < 50; i++) {
    17             String msg = "hello" + i;
    18 
    19             System.out.println("[WQ ] send:" + msg);
    20             channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    21             Thread.sleep(i * 20);
    22         }
    23         channel.close();
    24         connection.close();
    25     }
    26 }
    View Code

    Receive1.java

     1 public class Receive1 {
     2     public static final String QUEUE_NAME = "test_work_queue";
     3 
     4     public static void main(String[] args) throws IOException, TimeoutException {
     5         //创建链接
     6         Connection connection = ConnectionUtils.getConnection();
     7         //创建频道
     8         Channel channel = connection.createChannel();
     9         //声明队列
    10         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11 
    12         //定义一个消费者
    13         Consumer consumer = new DefaultConsumer(channel) {
    14             @Override
    15             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    16                 String msg = new String(body, "utf-8");
    17                 System.out.println("[1] Recv msg :" + msg);
    18                 try {
    19                     Thread.sleep(2000);
    20                 } catch (InterruptedException e) {
    21                     e.printStackTrace();
    22                 } finally {
    23                     System.out.println("[1] done");
    24                 }
    25             }
    26         };
    27         boolean autoAck = false;
    28         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    29     }
    30 }
    View Code

    Receive2.java

     1 public class Receive2 {
     2     public static final String QUEUE_NAME = "test_work_queue";
     3 
     4     public static void main(String[] args) throws IOException, TimeoutException {
     5         //创建链接
     6         Connection connection = ConnectionUtils.getConnection();
     7         //创建频道
     8         Channel channel = connection.createChannel();
     9         //声明队列
    10         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11 
    12         //定义一个消费者
    13         Consumer consumer = new DefaultConsumer(channel) {
    14             @Override
    15             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    16                 String msg = new String(body, "utf-8");
    17                 System.out.println("[2] Recv msg :" + msg);
    18                 try {
    19                     Thread.sleep(1000);
    20                 } catch (InterruptedException e) {
    21                     e.printStackTrace();
    22                 } finally {
    23                     System.out.println("[2] done");
    24                 }
    25             }
    26         };
    27         boolean autoAck = false;
    28         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    29     }
    30 }
    View Code

    现象:消费者1和消费者2处理的消息是一样的

               消费者1偶数

               消费者1奇数

               这种方式叫做轮询分发(round-robin),结果就是不管谁忙活着谁清闲,都不会多给一个消息,任意消息总是你一个,我一个

    2.2 公平分发(fair dipatch)

    Send.java

     1 /**
     2  * ---c1
     3  * p---Queue----
     4  * ---c2
     5  */
     6 public class Send {
     7     private static final String QUEUE_NAME = "test_work_queue";
     8 
     9     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    10         //获取链接
    11         Connection connection = ConnectionUtils.getConnection();
    12         //获取channel
    13         Channel channel = connection.createChannel();
    14         //声明队列
    15         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    16         /**
    17          * 每个消费者发送消费之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
    18          * 限制发送给同一个消费者不得超过一个消息
    19          */
    20         int prefetchCount = 1;
    21         channel.basicQos(prefetchCount);
    22 
    23         for (int i = 0; i < 50; i++) {
    24             String msg = "hello" + i;
    25 
    26             System.out.println("[WQ ] send:" + msg);
    27             channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    28             Thread.sleep(i * 5);
    29         }
    30         channel.close();
    31         connection.close();
    32     }
    33 }
    View Code

    Receive1.java

     1 public class Receive1 {
     2     public static final String QUEUE_NAME = "test_work_queue";
     3 
     4     public static void main(String[] args) throws IOException, TimeoutException {
     5         //创建链接
     6         Connection connection = ConnectionUtils.getConnection();
     7         //创建频道
     8         final Channel channel = connection.createChannel();
     9         //声明队列
    10         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11         //保证一次只发送一个
    12         channel.basicQos(1);
    13 
    14         //定义一个消费者
    15         Consumer consumer = new DefaultConsumer(channel) {
    16             @Override
    17             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    18                 String msg = new String(body, "utf-8");
    19                 System.out.println("[1] Recv msg :" + msg);
    20                 try {
    21                     Thread.sleep(2000);
    22                 } catch (InterruptedException e) {
    23                     e.printStackTrace();
    24                 } finally {
    25                     System.out.println("[1] done");
    26                     //手动回执
    27                     channel.basicAck(envelope.getDeliveryTag(), false);
    28                 }
    29             }
    30         };
    31         boolean autoAck = false;
    32         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    33     }
    34 }
    View Code

    Receive2.java

     1 public class Receive2 {
     2     public static final String QUEUE_NAME = "test_work_queue";
     3 
     4     public static void main(String[] args) throws IOException, TimeoutException {
     5         //创建链接
     6         Connection connection = ConnectionUtils.getConnection();
     7         //创建频道
     8         final Channel channel = connection.createChannel();
     9         //声明队列
    10         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11         //保证一次只发送一个
    12         channel.basicQos(1);
    13         //定义一个消费者
    14         Consumer consumer = new DefaultConsumer(channel) {
    15             @Override
    16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    17                 String msg = new String(body, "utf-8");
    18                 System.out.println("[2] Recv msg :" + msg);
    19                 try {
    20                     Thread.sleep(1000);
    21                 } catch (InterruptedException e) {
    22                     e.printStackTrace();
    23                 } finally {
    24                     System.out.println("[2] done");
    25                     //手动回执
    26                     channel.basicAck(envelope.getDeliveryTag(), false);
    27                 }
    28             }
    29         };
    30         boolean autoAck = false;
    31         channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    32     }
    33 }
    View Code

     现象:消费者2处理的消息比消费者1多,能者多劳

    三、消息与应答ack与消息持久化durable

     boolean autoAck = false;

     channel.basicConsume(QUEUE_NAME, autoAck, consumer);

     boolean autoAck = true;(自动确认模式)一旦rabbitmq将消息分发给消费者,就会从内存中删除

     这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息

     boolnea autoAck = false;(手动模式),如果有一个消费者挂掉,就会交付给其他消费者,

     rabbitmq支持消息应答,消费者发送一个应答,告诉rabbitmq这个消息我已经处理完成,你可以删除了,然后rabbitmq就删除

    内存中的消息

    消息应答模式是打开的,false

    Message acknowkedgment

    消息持久化

    boolean durable = false
    channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
  • 相关阅读:
    powerdesigner简单使用
    linux进程间通信方式
    linux中fork()函数详解(原创!!实例讲解)
    platform_device与platform_driver
    当心不静的时候
    linux移植简介[MS2]
    使用maven的tomcat:run进行web项目热部署
    SpringMVC &amp; Struts2
    开放产品开发(OPD):OPD框架
    【Android个人理解(八)】跨应用调用不同组件的方法
  • 原文地址:https://www.cnblogs.com/cxxjohnson/p/9101454.html
Copyright © 2011-2022 走看看