zoukankan      html  css  js  c++  java
  • java操作RabbitMq

    最近学习了一下RabbitMq,记录一下,主要是java对RabbitMq的一些基本操作,后期会更新springboot集成RabbitMq的文章以及git地址

    一、扇形交换机

    扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

    1、rabbitmq链接工具类

    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 11:24
     * @Description:
     */
    public class RabbitConnUtil {
        public static Connection getConn() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("heyu");
            connectionFactory.setPassword("heyu123");
            connectionFactory.setVirtualHost("heyu");
            connectionFactory.setPort(5672);
            return connectionFactory.newConnection();
    
        }
    }
    
    

    2、生产者

    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 14:15
     * @Description: 生产者   扇形交换机      交换机不做转存  只做转发
     */
    public class FanoutProducer {
        private static final String EXCHANGE_NAME = "peng_fanout";
    
        public static void main(String[] args) {
    
            try {
                Connection conn = RabbitConnUtil.getConn();
    //            创建通道
                Channel channel = conn.createChannel();
    //            绑定交换机 扇形交换机
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                String MSG = "交换机不做转存 只做转发";
                channel.basicPublish(EXCHANGE_NAME, "", null, MSG.getBytes(StandardCharsets.UTF_8));
                channel.close();
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    3、消费者

    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 15:21
     * @Description: 邮件消费者
     */
    public class EmailConsumer {
        private static final String EMAIL_QUEUE_NAME = "peng_email_queue";
        private static final String EXCHANGE_NAME = "peng_fanout";
    
        public static void main(String[] args) {
            try {
                Connection conn = RabbitConnUtil.getConn();
                Channel channel = conn.createChannel();
                channel.queueDeclare(EMAIL_QUEUE_NAME,false,false,false,null);
    //            消费者绑定交换机 主题队列名称   交换机名称  路由键
                channel.queueBind(EMAIL_QUEUE_NAME,EXCHANGE_NAME,"");
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        System.out.println("邮件消费者读取消息:"+msg);
                        long deliveryTag = envelope.getDeliveryTag();
                        channel.basicAck(deliveryTag,false);
                    }
                };
                channel.basicConsume(EMAIL_QUEUE_NAME,false,defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    
    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 14:16
     * @Description:  短信消费者
     */
    public class SmsConsumer {
        private static final String SMS_QUEUE_NAME = "peng_sms_queue";
        private static final String EXCHANGE_NAME = "peng_fanout";
    
        public static void main(String[] args) {
            try {
                Connection conn = RabbitConnUtil.getConn();
                Channel channel = conn.createChannel();
                channel.queueDeclare(SMS_QUEUE_NAME,false,false,false,null);
    //            消费者绑定交换机
                channel.queueBind(SMS_QUEUE_NAME,EXCHANGE_NAME,"");
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        System.out.println("短信消费者读取消息:"+msg);
                    }
                };
                channel.basicConsume(SMS_QUEUE_NAME,true,defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    

    二、主题交换机

    主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列
    1、RabbitMq工具连接类

    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 11:24
     * @Description:
     */
    public class RabbitConnUtil {
        public static Connection getConn() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("heyu");
            connectionFactory.setPassword("heyu123");
            connectionFactory.setVirtualHost("heyu");
            connectionFactory.setPort(5672);
            return connectionFactory.newConnection();
    
        }
    }
    
    

    2、生产者

    package com.heyu.rabbitmq.demo.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 14:15
     * @Description: 发布订阅
     */
    public class TopicProducer {
        private static final String EXCHANGE_NAME = "peng_topic_fanout";
    
        public static void main(String[] args) {
    
            try {
                Connection conn = RabbitConnUtil.getConn();
    //            创建通道
                Channel channel = conn.createChannel();
    //            绑定交换机 扇形交换机
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                String MSG = "routingKey 消息转发  测试";
                channel.basicPublish(EXCHANGE_NAME, "log", null, MSG.getBytes(StandardCharsets.UTF_8));
                channel.close();
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    3、消费者

    package com.heyu.rabbitmq.demo.topic;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: Melody
     * @date: 2020/12/24 15:21
     * @Description: 邮件消费者
     */
    public class EmailConsumer {
        private static final String EMAIL_QUEUE_NAME = "topic_email_queue";
        private static final String EXCHANGE_NAME = "peng_topic_fanout";
    
        public static void main(String[] args) {
            try {
                Connection conn = RabbitConnUtil.getConn();
                Channel channel = conn.createChannel();
                channel.queueDeclare(EMAIL_QUEUE_NAME,false,false,false,null);
    //            消费者绑定交换机 主题队列名称   交换机名称  路由键
                channel.queueBind(EMAIL_QUEUE_NAME,EXCHANGE_NAME,"log");
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        System.out.println("邮件消费者读取消息:"+msg);
                        long deliveryTag = envelope.getDeliveryTag();
                        channel.basicAck(deliveryTag,false);
                    }
                };
                channel.basicConsume(EMAIL_QUEUE_NAME,false,defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    
    package com.heyu.rabbitmq.demo.topic;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: Melody
     * @date: 2020/12/24 14:16
     * @Description:  消费者
     */
    public class SmsConsumer {
        private static final String SMS_QUEUE_NAME = "topic_sms_queue";
        private static final String EXCHANGE_NAME = "peng_topic_fanout";
    
        public static void main(String[] args) {
            try {
                Connection conn = RabbitConnUtil.getConn();
                Channel channel = conn.createChannel();
                channel.queueDeclare(SMS_QUEUE_NAME,false,false,false,null);
    //            消费者绑定交换机
                channel.queueBind(SMS_QUEUE_NAME,EXCHANGE_NAME,"log");
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        System.out.println("短信消费者读取消息:"+msg);
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                };
                channel.basicConsume(SMS_QUEUE_NAME,false,defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    
  • 相关阅读:
    UVA 10462 Is There A Second Way Left?(次小生成树&Prim&Kruskal)题解
    POJ 1679 The Unique MST (次小生成树)题解
    POJ 2373 Dividing the Path (单调队列优化DP)题解
    BZOJ 2709 迷宫花园
    BZOJ 1270 雷涛的小猫
    BZOJ 2834 回家的路
    BZOJ 2506 calc
    BZOJ 3124 直径
    BZOJ 4416 阶乘字符串
    BZOJ 3930 选数
  • 原文地址:https://www.cnblogs.com/pengcool/p/15610886.html
Copyright © 2011-2022 走看看