zoukankan      html  css  js  c++  java
  • RabbitMQ安装与消息模型

    RabbitMQ安装与消息模型

    通过docker安装rabbitmq

    #拉起镜像-management带控制台
    docker pull rabbitmq:management
    #运行容器
    docker run -d --hostname "主机名" --name mq1 -e RABBITMQ_DEFAULT_USER="账号" -e RABBITMQ_DEFAULT_PASS="密码" -p 15672:15672 -p 5672:5672 rabbitmq:management
    #进入容器
    docker exec -it mq1 bash
    #查看服务状态
    rabbitmqctl status
    #查看插件列表
    rabbitmq-plugins list
    #测试访问
    curl localhost:15672
    

    消息模型

    依赖

    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
    

    WEB操作

    新建一个虚拟主机

    新建一个用户,并点击用户名

    设置用户的权限,允许其访问ems主机

    封装工具类

    private static ConnectionFactory connectionFactory;
    
    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("122.51.70.176");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");
    }
    
    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    
    public static void close(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    

    直连模型

    生产者发送消息到队列, 消费者从中取出消息.

    生产者
    @Test
    public void helloWorld() throws IOException{
        Connection connection = MQUtils.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //通道绑定消息队列, 队列名hello, 队列不持久化, 非独占, 不自动删除, 附加参数
        channel.queueDeclare("hello", false, false, false, null);
        //发布消息, 不指定交换机, 队列名, 传递消息的额外设置, 消息内容
        channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes());
        //关闭通道
        MQUtils.close(channel, connection);
    }
    
    消费者
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", false, false, false, null);
        //消费消息, 队列名, 消息自动确认, 消费的回调接口
        channel.basicConsume("hello", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //打印获得的消息
                System.out.println(new String(body));
            }
        });
    }
    

    工作队列模型(work queues)

    让多个消费者绑定到一个队列, 共同消费队列中的消息, 该模型下默认消费者获取消息的方式是轮询. 消费者消费的信息数量是平均的.

    生产者
    @Test
    public void workQueues() throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work", true, false, false, null);
        for (int i = 0; i < 50; i++) {
            channel.basicPublish("", "work", null, ("hello work queues" + i).getBytes());
        }
        MQUtils.close(channel, connection);
    }
    
    消费者
    //消费者1
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work", true, false, false, null);
        channel.basicConsume("work", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                System.out.println("消费者1" + new String(body));
            }
        });
    }
    //消费者2
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work", true, false, false, null);
        channel.basicConsume("work", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                System.out.println("消费者2" + new String(body));
                try {
                    //慢处理
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    

    两个消费者获得的消息数量都是25条, 消费者1很快就完成了任务, 但是消费者2加班了很久. 需要优化改进,让消费者能者多劳.

    改进后的消费者
    //消费者1
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        final Channel channel = connection.createChannel();
        //设通道上的消息容量为1
        channel.basicQos(1);
        channel.queueDeclare("work", true, false, false, null);
        //把自动确认设置为false
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1" + new String(body));
                //手动确认消息, 不开启多条消息确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
    //消费者2
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.basicQos(1);
        channel.queueDeclare("work", true, false, false, null);
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    

    广播模型(发布-订阅模型)

    可以有多个消费者, 每个消费者都有自己的消费队列, 队列绑定到交换机上, 交换机从生产者接收消息. 最终可以实现一条消息被多个消费者消费.

    生产者
    @Test
    public void fanout() throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        //指定交换机, 交换机名为news, 交换机类型为fanout
        channel.exchangeDeclare("news","fanout");
        for (int i = 0; i < 10; i++) {
            //消息发布, 选择交换机, 无路由key, 无其他设置, 消息内容
            channel.basicPublish("news","",null, ("hello fanout" + i).getBytes());
        }
        MQUtils.close(channel, connection);
    }
    
    n个消费者
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定队列
        channel.queueBind(queue, "news", "");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者n" + new String(body));
            }
        });
    }
    

    直连模型(静态路由模型)

    在路由的直连(Direct)模式下, 队列与交换机之间不再是任意绑定了, 而是要指定一个路由Key, 消息的发送也必须要指定上路由Key, 交换机根据对消息路由Key进行判断, 只有相匹配的情况下消费者才会接收到消息.

    生产者
    @Test
    public void direct() throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        //指定交换机, 交换机名为mail, 交换机类型为direct
        channel.exchangeDeclare("mail", "direct");
        for (int i = 0; i < 10; i++) {
            //消息发布, 选择交换机, 无路由key, 无其他设置, 消息内容
            if (i == 4 | i == 8) {
                channel.basicPublish("mail", "vip", null, ("vip消息" + i).getBytes());
                continue;
            }
            channel.basicPublish("mail", "user", null, ("用户消息" + i).getBytes());
        }
        MQUtils.close(channel, connection);
    }
    
    消费者
    //消费者1-普通用户
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定队列
        channel.queueBind(queue, "mail", "user");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1" + new String(body));
            }
        });
    }
    //消费者1-vip用户
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定多个队列
        channel.queueBind(queue, "mail", "user");
        channel.queueBind(queue, "mail", "vip");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者2" + new String(body));
            }
        });
    }
    

    订阅模型(动态路由模型)

    在路由的订阅(Topic)模式下, 也是使用路由Key来分配消息的, 不过在绑定路由Key的时候可以使用通配符, 一般路由Key可以由一到多个单词组成, 单词之间使用"."进行分割.

    发布者
    @Test
    public void Topic() throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        //指定交换机, 交换机名为notice, 交换机类型为topic
        channel.exchangeDeclare("notice", "topic");
        for (int i = 0; i < 10; i++) {
            //消息发布, 选择交换机, 无路由key, 无其他设置, 消息内容
            if (i == 4) {
                channel.basicPublish("notice", "user.vip.msg", null, ("vip消息" + i).getBytes());
                continue;
            }else if (i == 8) {
                channel.basicPublish("notice", "user.vip.present", null, ("vip礼物" + i).getBytes());
                continue;
            }
            channel.basicPublish("notice", "user.msg", null, ("用户消息" + i).getBytes());
        }
        MQUtils.close(channel, connection);
    }
    
    消费者
    //消费者1
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定队列, *任意一个字词, #任意数量字词
        channel.queueBind(queue, "notice", "user.*");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1" + new String(body));
            }
        });
    }
    //消费者2
    public static void main(String[] args) throws IOException {
        Connection connection = MQUtils.getConnection();
        Channel channel = connection.createChannel();
        //临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定队列, *任意一个字词, #任意数量字词
        channel.queueBind(queue, "notice", "user.#");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者2" + new String(body));
            }
        });
    }
    
  • 相关阅读:
    log4j 使用笔记整理中
    执行bat文件
    excel让每个单元格的宽度随着字体自动变动的两种方式(有更好方法的大神,请忽略,求评论下)
    XML中CDATA及其字符实体的使用
    Java文件读写操作指定编码方式。。。。。
    尾数为0零BigDecimal不能装成正常数
    jquery 自动补全控件(支持IE6)待整理
    $.ajax提交,后台接受到的值总是乱码?明天再总结
    js定义变量需赋予初始值
    存储过程的优缺点
  • 原文地址:https://www.cnblogs.com/pinked/p/13697056.html
Copyright © 2011-2022 走看看