zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记1-hello world

    安装过程略过,一搜一大把。

    rabbitmq管理控制台:http://localhost:15672/   默认账户:guest/guest

    RabbitMQ默认监听端口:5672

    JAVA API地址:http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.6.5/rabbitmq-java-client-javadoc-3.6.5/

    引入rabbitmq-java-client-bin-3.6.5下的jar包:

    简单的基本流程是:

    生产者:

    1、连接到RabbitMQ
    2、获取channel
    3、声明exchange
    4、创建并发布消息
    5、关闭信道和连接

    消费者:

    1、连接到RabbitMQ
    2、获取channel
    3、声明exchange
    4、声明Queue
    5、使用路由键将exchange和queue绑定
    6、消费消息
    7、关闭信道和连接

    1、消息生产者SenderWithExchange.java

     1 package com.yzl.test1;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.rabbitmq.client.ConnectionFactory;
     6 
     7 /**
     8  * 使用交换器的生产者
     9  * @author: yzl
    10  * @date: 2016-10-16
    11  */
    12 public class SenderWithExchange {
    13     //路由键名称
    14     private final static String ROUTING_KEY = "rk";
    15 
    16     public static void main(String[] args) throws Exception {
    17         //连接到rabbitmq服务器
    18         ConnectionFactory factory = new ConnectionFactory();
    19         factory.setHost("localhost");
    20         Connection connection = factory.newConnection();
    21         //创建一个信道
    22         Channel channel = connection.createChannel();
    23         //声明direct类型的交换器
    24         channel.exchangeDeclare("myDirectExchange","direct");
    25         
    26         String msg = null;
    27         for(int i=1; i<100; i++){
    28             msg = "hello world" + i;
    29             //发送消息给myDirectExchange交换器,并且路由键是rk
    30             channel.basicPublish("myDirectExchange", ROUTING_KEY, null, msg.getBytes());
    31             System.out.println("Sender send new msg:" + msg);
    32             Thread.sleep(2000);
    33         }
    34         
    35         //关闭信道
    36         channel.close();
    37         //关闭连接
    38         connection.close();
    39     }
    40 }

    2、消息消费者ReceiverWithExchange.java

     1 package com.yzl.test1;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.AMQP.BasicProperties;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.ConnectionFactory;
     9 import com.rabbitmq.client.Consumer;
    10 import com.rabbitmq.client.DefaultConsumer;
    11 import com.rabbitmq.client.Envelope;
    12 
    13 /**
    14  * 消息消费者
    15  * @author: yzl
    16  * @date: 2016-10-16
    17  */
    18 public class ReceiverWithExchange {
    19     //消息队列名称
    20     private final static String QUEUE_NAME = "hello";
    21     //路由键名称
    22     private final static String ROUTING_KEY = "rk";
    23     /**
    24      * @param args
    25      */
    26     public static void main(String[] args) throws Exception{
    27         ConnectionFactory factory = new ConnectionFactory();
    28         factory.setHost("localhost");
    29         Connection connection = factory.newConnection();
    30         Channel channel = connection.createChannel();
    31         //定义队列
    32         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    33         //定义direct类型的交换器
    34         channel.exchangeDeclare("myDirectExchange","direct");
    35         //通过路由键 将 队列和交换器绑定起来
    36         channel.queueBind(QUEUE_NAME, "myDirectExchange", ROUTING_KEY);
    37         System.out.println("Receiver waiting to accept msg");
    38         
    39         //消息消费处理器
    40         Consumer consumer = new DefaultConsumer(channel){
    41             @Override
    42             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    43                     throws IOException {
    44                 String msg = new String(body);
    45                 System.out.println("accept a new msg :" + msg);
    46             }
    47         };
    48         //绑定消息处理器
    49         channel.basicConsume(QUEUE_NAME, true, consumer);
    50     }
    51 
    52 }

    如果先运行SenderWithExchange后运行ReceiverWithExchange将会丢失一部分时间内的数据。反之则没问题,具体原因可以看RabbitMQ学习笔记2-理解消息通信

    direct交换器是使用for循环的形式一个一个交替发给所有绑定的队列上去

  • 相关阅读:
    PAT 1006 Sign In and Sign Out
    PAT 1004. Counting Leaves
    JavaEE开发环境安装
    NoSql数据库探讨
    maven的配置
    VMWARE 下使用 32位 Ubuntu Linux ,不能给它分配超过3.5G 内存?
    XCODE 4.3 WITH NO GCC?
    在苹果虚拟机上跑 ROR —— Ruby on Rails On Vmware OSX 10.7.3
    推荐一首让人疯狂的好歌《Pumped Up Kicks》。好吧,顺便测下博客园可以写点无关技术的帖子吗?
    RUBY元编程学习之”编写你的第一种领域专属语言“
  • 原文地址:https://www.cnblogs.com/yangzhilong/p/5817333.html
Copyright © 2011-2022 走看看