zoukankan      html  css  js  c++  java
  • RabbitMQ JAVA客户端调用

    1.安装erlang

      下载地址:http://www.erlang.org/downloads

      设置ERLANG环境变量

    2.安装RabbitMQ

      下载地址: http://www.rabbitmq.com/download.html

    输入命令安装各种管理插件:

    D:RabbitMQServer
    abbitmq_server-3.7.10sbin>rabbitmq-plugins enable rabbitmq_management

    重启服务

    net stop rabbitmq && net start rabbitmq

    登录

    http://127.0.0.1:15672 默认用户名密码 guest  guest

    常用命令(RabbitMQ命令在sbin目录下D:RabbitMQServer abbitmq_server-3.7.10sbin,记得设置环境变量)

    rabbitmqctl delete_vhost test_vhosts 删除虚拟机test_vhosts 

    3. RabbitMQ知识整理

    来自(https://blog.csdn.net/dreamchasering/article/details/77653512)

    什么是MQ?

          MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。

          RabbitMQ是MQ的一种。下面详细介绍一下RabbitMQ的基本概念。

          1、队列、生产者、消费者

          队列是RabbitMQ的内部对象,用于存储消息。生产者(下图中的P)生产消息并投递到队列中,消费者(下图中的C)可以从队列中获取消息并消费。

          

          多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

          

    2、Exchange、Binding

          刚才我们看到生产者将消息投递到队列中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),再通过Binding将Exchange与Queue关联起来。

          

    3、Exchange Type、Bingding key、routing key

          在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key。在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。

          生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。

          RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

          fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

          direct:把消息投递到那些binding key与routing key完全匹配的队列中。

          topic:将消息路由到binding key与routing key模式匹配的队列中。

          附上一张RabbitMQ的结构图:

          

        

    最后来具体解析一下几个问题:

    1、可以自动创建队列,也可以手动创建队列,如果自动创建队列,那么是谁负责创建队列呢?是生产者?还是消费者? 

          如果队列不存在,当然消费者不会收到任何的消息。但是如果队列不存在,那么生产者发送的消息就会丢失。所以,为了数据不丢失,消费者和生产者都可以创建队列。那么如果创建一个已经存在的队列呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是队列属性并不会改变。

          队列对于负载均衡的处理是完美的。对于多个消费者来说,RabbitMQ使用轮询的方式均衡的发送给不同的消费者。

    2、RabbitMQ的消息确认机制

          默认情况下,如果消息已经被某个消费者正确的接收到了,那么该消息就会被从队列中移除。当然也可以让同一个消息发送到很多的消费者。

          如果一个队列没有消费者,那么,如果这个队列有数据到达,那么这个数据会被缓存,不会被丢弃。当有消费者时,这个数据会被立即发送到这个消费者,这个数据被消费者正确收到时,这个数据就被从队列中删除。

         那么什么是正确收到呢?通过ack。每个消息都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么:

         RabbitMQ Server会把这个信息发送到下一个消费者。

         如果这个app有bug,忘记了ack,那么RabbitMQServer不会再发送数据给它,因为Server认为这个消费者处理能力有限。

        而且ack的机制可以起到限流的作用(Benefitto throttling):在消费者处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的均衡消费者的负载。

    4.JAVA demo

    引入RabbitMQ客户端

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>

    3.1 使用默认配置直接发送消息到队列

    生产者

    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 
     * 默认发送,直接将消息发送到某个队列,默认交换机type为direct
     * 
     * @author
     * @date 2019/01/10 11:17:10
     */
    public class ProducterDirectDemo {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            String queneName = "testQuene";
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                factory.setPort(5672);
                factory.setUsername("guest");
                factory.setPassword("guest");
                factory.setVirtualHost("test_vhosts");
                // 创建与RabbitMQ服务器的TCP连接
                connection = factory.newConnection();
                // 创建一个频道
                channel = connection.createChannel();
                // 声明默认的队列
                channel.queueDeclare(queneName, true, false, true, null);
                while (true) {
                    channel.basicPublish("", queneName, null, UUID.randomUUID().toString().getBytes());
                    Thread.sleep(1000);
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    }

    消费者

    import java.io.IOException;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     * 接收默认消息
     * 
     * @author
     * @date 2019/01/10 11:14:32
     */
    public class ConsumerDirectDemo {
        public static void main(String[] args) {
            String queneName = "testQuene";
            Connection connection = null;
            Channel channel = null;
            try {
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                factory.setPort(5672);
                factory.setUsername("guest");
                factory.setPassword("guest");
                factory.setVirtualHost("test_vhosts");
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                            byte[] body) throws IOException {
                        String message = new String(body, "UTF-8");
                        System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + message);
                    }
                };
                // channel绑定队列,autoAck为true表示一旦收到消息则自动回复确认消息
                channel.basicConsume(queneName, true, consumer);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    3.2 设置交换器,队列,路由发送消息

    生产者

    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 指定交换机,队列,路由key方式
     * 
     * @author
     * @date 2019/01/10 11:19:38
     */
    public class ProducterAllDemo {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            String queneName = "firstQueue";
            String exchangeName = "amq.fanout";
            String routingKey = "test1";
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                factory.setPort(5672);
                factory.setUsername("guest");
                factory.setPassword("guest");
                factory.setVirtualHost("test_vhosts");
    
                // 创建与RabbitMQ服务器的TCP连接
                connection = factory.newConnection();
                // 创建一个频道
                channel = connection.createChannel();
                // 声明交换机类型
                channel.exchangeDeclare("amq.fanout", "fanout", true);
                // 声明默认的队列 (也可不申明队列,使用默认队列)
                channel.queueDeclare(queneName, true, false, true, null);
                // String queue = channel.queueDeclare().getQueue();
                // 将队列与交换机绑定
                channel.queueBind(queneName, exchangeName, routingKey);
                // 指定一个队列
                // channel.queueDeclare(queneName, false, false, false, null);
                while (true) {
                    channel.basicPublish(exchangeName, routingKey, null, UUID.randomUUID().toString().getBytes());
                    Thread.sleep(1000);
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
    
        }
    }

    消费者

    import java.io.IOException;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     * 
     * @author
     * @date 2019/01/10 11:19:42
     */
    public class ConsumerAllDemo {
        public static void main(String[] args) {
            String queneName = "firstQueue";
            String exchangeName = "amq.fanout";
            String routingKey = "test1";
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                factory.setPort(5672);
                factory.setUsername("guest");
                factory.setPassword("guest");
                factory.setVirtualHost("test_vhosts");
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 声明交换机类型
                channel.exchangeDeclare(exchangeName, "fanout", true);
                // 声明默认的队列(也可不申明队列,使用默认队列)
                channel.queueDeclare(queneName, true, false, true, null);
                // String queue = channel.queueDeclare().getQueue();
                // 将队列与交换机绑定
                channel.queueBind(queneName, exchangeName, routingKey);
    
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                            byte[] body) throws IOException {
                        String message = new String(body, "UTF-8");
                        System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + message);
                    }
                };
                // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
                channel.basicConsume(queneName, true, consumer);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }
  • 相关阅读:
    ZOJ 3603字符串操作
    ZOJ 3609 求逆元
    HDOJ 4007 Dave【最大覆盖集】
    HDOJ4006 The kth great number 【串的更改和维护】
    【集训笔记】博弈论相关知识【HDOJ 1850【HDOJ2147
    【集训笔记】母函数【母函数模板】【HDOJ1028【HDOJ1085
    【集训笔记】【大数模板】特殊的数 【Catalan数】【HDOJ1133【HDOJ1134【HDOJ1130
    【集训笔记】动态规划背包问题【HDOJ1421【HDOJ1058【HDOJ2546
    【集训笔记】动态规划【HDOJ1159【HDOJ1003
    【集训笔记】二分图及其应用【HDOJ1068【HDOJ1150【HDOJ1151
  • 原文地址:https://www.cnblogs.com/zincredible/p/10249502.html
Copyright © 2011-2022 走看看