zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列

    1.相关术语

    Producer(生产者):发送消息的一方。

    Consumer(消费者):接收消息的一方。

    Broker(消息中间件的服务节点:rabbitmq服务器, 生产者将消息发布到broker中, 消费者从broker中订阅消息

    RoutingKey(路由键):生产者将消息发送到交换器的时候会指定一个RoutingKey,用来指定路由规则,且Routingkey要与交换器类型和BindingKey(绑定键) 联合使用才能生效。

    Binding(绑定):将交换器和队列关联起来,绑定的时候一般会指定一个绑定键,这样rabbitmq就知道如何正确将消息路由到队列了。

    Queue(队列):存储消息的缓冲区。

    Exchange(路由器/交换器):接收生产者发来的消息,并将这些消息推送到队列中。

    Exchange一共有四种类型:fanout,direct,topic,header 。

    • Fanout:会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
    • Direct:会把消息路由到哪些BindingKey和 RoutingKey完全匹配的队列中
    • Topic:direct类型交换器类似不同之处在于在匹配BindingKey和 RoutingKey的规则上做了扩展
    • Headers:根据发送的消息内容中的headers属性进行匹配

    Publish/Subscribe(发布订阅模式):一个消息可以发送到多个消费者中。

    2.常用方法

    1)连接到RabbitMQ(AMQP代理)

      方法1:设定参数连接

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(userName);//用户名
    factory.setPassword(password);//密码
    factory.setVirtualHost(virtualHost);//虚拟主机
    factory.setHost(hostName);//主机名
    factory.setPort(portNumber);//端口号
    Connection connection= factory.newConnection();

      方法2:通过url连接

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://username:password@ipAdress:portNum/virtualhost");
    Connection connection= factory.newConnection();

    2)创建通道

    Channel channel=connection.createChannel();

    3)声明消息队列

    channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments);
    • durable:设置是否持久化持久化可将交换器存盘,服务器重启不会丢失相关信息true、false true:在服务器重启时,能够存活
    • exclusive:是否为当前连接的专用队列在连接断开后,会自动删除该队列。
    • autodelete:是否自动删除当没有任何消费者使用时,自动删除该队列。
    • Arguments:设置队列的其他一些参数: 比如x-message-ttl

    4)声明交换机

    channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete);
    • type:direct、fanout、topic三种

    5)绑定交换机和队列。

    channel.queueBind(String queue, String exchangeName, routingKey);
    • routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用

    6)发布消息

    channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);
    • props:消息的基本属性,其中包括14个属性成员       
    • body:消息体(payload) 真正需要发送的消息

    7)收消息

    channel.basicConsume(String queue,boolean autoAck, Consumer callback);
    • autoAck:是否自动确认消息,true自动确认,false要手动调用
    • callback:消费者 DefaultConsumer建立使用,重写其中的方法
  • 相关阅读:
    JavaScript数组操作
    cxf-rs 和 swagger 的点
    cxf-rs 、spring 和 swagger 环境配置切换【github 有项目】
    (二)swagger-springmvc
    (二)spring-mvc-showcase 和 swagger-springmvc 的恩恩怨怨
    svn 创建tag
    swagger 入门
    jax-rs
    swagger core 和 swagger ui 如何关联【窥探】
    配置 struts2 时掉进 web.xml 的坑
  • 原文地址:https://www.cnblogs.com/huozhonghun/p/10862753.html
Copyright © 2011-2022 走看看