zoukankan      html  css  js  c++  java
  • RabbitMQ 消息中间件

    RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。
    支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。

    场景1:单发送单接收 

    使用场景:简单的发送与接收,没有特别的处理。

    package com.qa.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConnectionUtil {
    
        private static ConnectionFactory factory;
    
        static{
            //连接rabbitmq服务器
            factory = new ConnectionFactory();
            factory.setHost("192.168.189.128");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/admin");
        }
    
        public static Connection getConnection(){
            Connection connection = null;
            try {
                connection = factory.newConnection();
                return connection;
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    public class Provider {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取通道Channel对象,后续所有的操作都是基于Channel实现
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("queue1",false,false,false,null);
            //给队列发送消息
            channel.basicPublish(
                    "",
                    "queue1",
                    null,
                    "Hello RabbitMQ".getBytes("utf-8"));
            //断开链接
            connection.close();
        }
    }
    public class Consumer {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取Channel对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费端获得消息:"+new String(body,"utf-8"));
                }
            });
        }
    }

    场景2:单发送多接收

    使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。

    public class Provider {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取通道Channel对象,后续所有的操作都是基于Channel实现
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("queue1",false,false,false,null);
            //给队列发送消息
            for (int i=1;i<=10;i++){
                channel.basicPublish(
                        "",
                        "queue1",
                        null,
                        ("Hello RabbitMQ"+i).getBytes("utf-8"));
            }
            //断开链接
            connection.close();
        }
    }
    public class Consumer {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取Channel对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费端1获得消息:"+new String(body,"utf-8"));
                }
            });
        }
    }
    public class Consumer2 {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取Channel对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费端2获得消息:"+new String(body,"utf-8"));
                }
            });
        }
    }

    场景3:Publish/Subscribe

    使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。

    public class Provider {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取通道Channel对象,后续所有的操作都是基于Channel实现
            Channel channel = connection.createChannel();
            //创建交换机
            channel.exchangeDeclare("fanout_exchange","fanout");
            //给交换机发送消息
            for (int i=1;i<=10;i++){
                channel.basicPublish(
                        "fanout_exchange",
                        "",
                        null,
                        ("Hello RabbitMQ"+i).getBytes("utf-8"));
            }
            //断开链接
            connection.close();
        }
    }
    public class  Consumer {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取Channel对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue1",false,false,false,null);
            //绑定交换机
            channel.queueBind("fanout_queue1","fanout_exchange","");
            //监控队列
            channel.basicConsume("fanout_queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获取消息:"+new String(body,"utf-8"));
                }
            });
        }
    }
    public class Consumer2 {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取Channel对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue2",false,false,false,null);
            //绑定交换机
            channel.queueBind("fanout_queue2","fanout_exchange","");
            //监控队列
            channel.basicConsume("fanout_queue2",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获取消息:"+new String(body,"utf-8"));
                }
            });
        }
    }

    场景4:Routing (按路线发送接收)

    使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。

    public class Provider {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取通道Channel对象,后续所有的操作都是基于Channel实现
            Channel channel = connection.createChannel();
            //创建交换机
            channel.exchangeDeclare("direct_exchange","direct");
            //给交换机发送消息
            for (int i=1;i<=10;i++){
                channel.basicPublish(
                        "direct_exchange",
                        "insert", //路由键(消费端绑定交换机对应的路由键为insert,才能监听到该消息)
                        null,
                        ("Hello RabbitMQ"+i).getBytes("utf-8"));
            }
            //断开链接
            connection.close();
        }
    }
    public class  Consumer {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取Channel对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue1",false,false,false,null);
            //绑定交换机
            channel.queueBind("direct_queue1","direct_exchange","select"); //路由键
            //监控队列
            channel.basicConsume("direct_queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获取消息:"+new String(body,"utf-8"));
                }
            });
        }
    }
    public class Consumer2 {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取Channel对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue2",false,false,false,null);
            //绑定交换机
            channel.queueBind("direct_queue2","direct_exchange","select"); //路由键
            channel.queueBind("direct_queue2","direct_exchange","insert");
            //监控队列
            channel.basicConsume("direct_queue2",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获取消息:"+new String(body,"utf-8"));
                }
            });
        }
    }

    场景5:Topics (按topic发送接收)

    使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。

    public class Provider {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取通道Channel对象,后续所有的操作都是基于Channel实现
            Channel channel = connection.createChannel();
            //创建交换机
            channel.exchangeDeclare("topic_exchange","topic");
            //给交换机发送消息
            for (int i=1;i<=10;i++){
                channel.basicPublish(
                        "direct_exchange",
                        "emp.hello", //路由键
                        null,
                        ("Hello RabbitMQ"+i).getBytes("utf-8"));
            }
            //断开链接
            connection.close();
        }
    }
    public class  Consumer {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取Channel对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue1",false,false,false,null);
            //绑定交换机
            channel.queueBind("topic_queue1","topic_exchange","emp.#"); //emp.# 路由键
            //监控队列
            channel.basicConsume("topic_queue1",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获取消息:"+new String(body,"utf-8"));
                }
            });
        }
    }
    public class Consumer2 {
        public static void main(String[] args) throws Exception {
            Connection connection =  ConnectionUtil.getConnection();
            //获取Channel对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue2",false,false,false,null);
            //绑定交换机
            channel.queueBind("topic_queue2","topic_exchange","emp.*"); //emp.* 路由键
            //监控队列
            channel.basicConsume("topic_queue2",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获取消息:"+new String(body,"utf-8"));
                }
            });
        }
    }

    SpringBoot整合RabbitMQ: https://www.cnblogs.com/wakey/p/10701454.html

  • 相关阅读:
    Django 07模型层—单表操作(增删改查)
    Django 05(模板-变量、过滤器、 标签 )
    Django 04(url与views相关内容)
    路由基础及反向解析
    Django项目基础
    Django框架导读
    异常处理
    Docker
    Docker基本概念
    Docker架构
  • 原文地址:https://www.cnblogs.com/wakey/p/12698798.html
Copyright © 2011-2022 走看看