zoukankan      html  css  js  c++  java
  • (转) RabbitMQ学习之工作队列(java)

    http://blog.csdn.net/zhu_tianwei/article/details/40887717

     参考:http://blog.csdn.NET/lmj623565791/article/details/37620057

    1.生产任务Task.Java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.workqueue;  
    2.   
    3. import com.rabbitmq.client.AMQP;  
    4. import com.rabbitmq.client.Channel;  
    5. import com.rabbitmq.client.Connection;  
    6. import com.rabbitmq.client.ConnectionFactory;  
    7. import com.rabbitmq.client.MessageProperties;  
    8.   
    9. public class Task {  
    10.       
    11.     //队列名称    
    12.     private final static String QUEUE_NAME = "workqueue-durable";    
    13.   
    14.     public static void main(String[] args) throws Exception {  
    15.          //创建连接和频道    
    16.         ConnectionFactory factory = new ConnectionFactory();    
    17.         factory.setHost("192.168.101.174");    
    18.         //指定用户 密码  
    19.         factory.setUsername("admin");  
    20.         factory.setPassword("admin");  
    21.         //指定端口  
    22.         factory.setPort(AMQP.PROTOCOL.PORT);  
    23.         Connection connection = factory.newConnection();    
    24.         Channel channel = connection.createChannel();    
    25.         boolean durable = true; //设置消息持久化  RabbitMQ不允许使用不同的参数重新定义一个队列,所以已经存在的队列,我们无法修改其属性。  
    26.         //声明队列   
    27.         channel.queueDeclare(QUEUE_NAME, durable, false, false, null);    
    28.          
    29.         //发送10条消息,依次在消息后面附加1-10个点    
    30.         for (int i = 5; i > 0; i--)    
    31.         {    
    32.             String dots = "";    
    33.             for (int j = 0; j <= i; j++)    
    34.             {    
    35.                 dots += ".";    
    36.             }    
    37.             String message = "helloworld" + dots+dots.length();    
    38.             //MessageProperties.PERSISTENT_TEXT_PLAIN 标识我们的信息为持久化的  
    39.             channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());    
    40.             System.out.println("Sent Message:'" + message + "'");    
    41.         }    
    42.         //关闭频道和资源    
    43.         channel.close();    
    44.         connection.close();    
    45.     }  
    46.   
    47. }  

    2.消费工作队列

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.workqueue;  
    2.   
    3. import com.rabbitmq.client.AMQP;  
    4. import com.rabbitmq.client.Channel;  
    5. import com.rabbitmq.client.Connection;  
    6. import com.rabbitmq.client.ConnectionFactory;  
    7. import com.rabbitmq.client.QueueingConsumer;  
    8.   
    9. public class Work {  
    10.     //队列名称    
    11.     private final static String QUEUE_NAME = "workqueue-durable";    
    12.       
    13.     public static void main(String[] args) throws Exception {  
    14.          //区分不同工作进程的输出    
    15.         int hashCode = Work.class.hashCode();    
    16.         //创建连接和频道    
    17.         ConnectionFactory factory = new ConnectionFactory();    
    18.         factory.setHost("192.168.101.174");    
    19.         //指定用户 密码  
    20.         factory.setUsername("admin");  
    21.         factory.setPassword("admin");  
    22.         //指定端口  
    23.         factory.setPort(AMQP.PROTOCOL.PORT);  
    24.         Connection connection = factory.newConnection();    
    25.         Channel channel = connection.createChannel();    
    26.         boolean durable = true; //设置消息持久化  RabbitMQ不允许使用不同的参数重新定义一个队列,所以已经存在的队列,我们无法修改其属性。  
    27.         //声明队列    
    28.         channel.queueDeclare(QUEUE_NAME, durable, false, false, null);    
    29.         
    30.         QueueingConsumer consumer = new QueueingConsumer(channel);    
    31.           
    32.         /** 
    33.          * ack= true: Round-robin 转发   消费者被杀死,消息会丢失 
    34.          * ack=false:消息应答 ,为了保证消息永远不会丢失,RabbitMQ支持消息应答(message acknowledgments)。 
    35.          * 消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ可以自由的进行信息删除。 
    36.          * 如果消费者被杀死而没有发送应答,RabbitMQ会认为该信息没有被完全的处理,然后将会重新转发给别的消费者。 
    37.          * 通过这种方式,你可以确认信息不会被丢失,即使消者偶尔被杀死。 
    38.          * 消费者需要耗费特别特别长的时间是允许的。 
    39.          *  
    40.          */  
    41.           
    42.         boolean ack = false ; //打开应答机制   
    43.         // 指定消费队列    
    44.         channel.basicConsume(QUEUE_NAME, ack, consumer);    
    45.           
    46.           
    47.         //公平转发  设置最大服务转发消息数量    只有在消费者空闲的时候会发送下一条信息。  
    48.         int prefetchCount = 1;  
    49.         channel.basicQos(prefetchCount);  
    50.           
    51.         while (true)    
    52.         {    
    53.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
    54.             String message = new String(delivery.getBody());    
    55.     
    56.             System.out.println(hashCode + " Received Message:'" + message + "'");    
    57.             doWork(message);    
    58.             System.out.println(hashCode + " Received Done");    
    59.             //发送应答    
    60.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);    
    61.     
    62.         }    
    63.     }  
    64.       
    65.     /**  
    66.      * 每个点耗时1s  
    67.      * @param task  
    68.      * @throws InterruptedException  
    69.      */    
    70.     private static void doWork(String task) throws InterruptedException    
    71.     {    
    72.         for (char ch : task.toCharArray())    
    73.         {    
    74.             if (ch == '.')    
    75.                 Thread.sleep(1000);    
    76.         }    
    77.     }    
    78.   
    79. }  

    多启动几个消费者工作进程,使用生产者发送消息,可以观察消费情况。

    要了解RabbitMQ的路由机制,exchange是一个关键。exchange可以叫做交换机,也似乎可以叫做路由器,反正它是用来选择路由的。RabbitMQ的核心思想就是消息的发布者不是直接把消息发送到目标队列中的,事实上,通常它并不知道消息要发到哪个队列中,它只知道把消息队列发送到exchange中。exchange一边接收发送者发过来的消息,而另一边则把消息发送到目标队列中去。exchange一定知道哪些队列需要接收这个消息,是加到一个队列里还是加到好几个队列里,还是直接扔掉。

    如果用空字符串去申明一个exchange,那么系统就会使用"amq.direct"这个exchange。前面我们使用的都是amq.direct类型。

    channel.BasicPublish("", "TaskQueue", properties, bytes);

    direct exchange 发送消息是要看routingKey的。举个例子,定义了一个direct exchange 名字是X1,然后一个queue名字为Q1 用routingKey=K1 绑定到exchange X1上,当一个routeKey为 K2 的消息到达X1上,那么只有K1=K2的时候,这个消息才能到达Q1上。

    fanout类型的exchange就比较好理解。就是简单的广播,而且是忽略routingKey的。所以只要是有queue绑定到fanout exchange上,通过这个exchange发送的消息都会被发送到那些绑定的queue中,不管你有没有输入routingKey。

    Topic类型的exchange给与我们更大的灵活性。通过定义routingKey可以有选择的订阅某些消息,此时routingKey就会是一个表达式。exchange会通过匹配绑定的routingKey来决定是否要把消息放入对应的队列中。有两种表达式符号可以让我们选择:#和*。

    *(星号):代表任意的一个词。 例:*.a会匹配a.a,b.a,c.a等

    #(井号):代码任意的0个或多个词。 例:#.a会匹配a.a,aa.a,aaa.a等

    topic exchange 有时候的行为会像其他类型的exchange,比如说:

    当routingKey只是有#号的时候,它的行为和fanout的行为是一样的。

    当routingKey什么的没有,空字符串的时候,它的行为是和direct是一样的。

    要注意的是,符号代表的是词不是字符。RabbitMQ中在表达式中词的定义是以.(点号)分隔的。

    Headers类型的exchange使用的比较少。以后再说。

    下面主要用代码,实现一下direct、fanout、topic的效果。

  • 相关阅读:
    268. Missing Number
    Java中Synchronized的用法
    构造器里面的super()有什么用?到底写不写?
    android studio快速导入其他人的项目,避免下载gradle长时间卡住
    开发中遇到的问题---【Feign远程调用时,@PathVariable 注解中的value属性不能省略】
    我爱java系列---【次日凌晨00:00:00生效】
    开发中遇到的问题---【feign的多参数问题】
    我爱java系列---【springboot中分页插件pagehelper自定义返回结果类型】
    开发中遇到的问题---【当类型设置为Integer时,传入的值为0,会将其转化为空字符串,从而造成查询数据异常】
    我爱java系列---【mysql中的数据类型和java中的数据类型的对应】
  • 原文地址:https://www.cnblogs.com/telwanggs/p/7124578.html
Copyright © 2011-2022 走看看