zoukankan      html  css  js  c++  java
  • java操作RabbitMq

    最近学习了一下RabbitMq,记录一下,主要是java对RabbitMq的一些基本操作,后期会更新springboot集成RabbitMq的文章以及git地址

    一、扇形交换机

    扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

    1、rabbitmq链接工具类

    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 11:24
     * @Description:
     */
    public class RabbitConnUtil {
        public static Connection getConn() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("heyu");
            connectionFactory.setPassword("heyu123");
            connectionFactory.setVirtualHost("heyu");
            connectionFactory.setPort(5672);
            return connectionFactory.newConnection();
    
        }
    }
    
    

    2、生产者

    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 14:15
     * @Description: 生产者   扇形交换机      交换机不做转存  只做转发
     */
    public class FanoutProducer {
        private static final String EXCHANGE_NAME = "peng_fanout";
    
        public static void main(String[] args) {
    
            try {
                Connection conn = RabbitConnUtil.getConn();
    //            创建通道
                Channel channel = conn.createChannel();
    //            绑定交换机 扇形交换机
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                String MSG = "交换机不做转存 只做转发";
                channel.basicPublish(EXCHANGE_NAME, "", null, MSG.getBytes(StandardCharsets.UTF_8));
                channel.close();
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    3、消费者

    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 15:21
     * @Description: 邮件消费者
     */
    public class EmailConsumer {
        private static final String EMAIL_QUEUE_NAME = "peng_email_queue";
        private static final String EXCHANGE_NAME = "peng_fanout";
    
        public static void main(String[] args) {
            try {
                Connection conn = RabbitConnUtil.getConn();
                Channel channel = conn.createChannel();
                channel.queueDeclare(EMAIL_QUEUE_NAME,false,false,false,null);
    //            消费者绑定交换机 主题队列名称   交换机名称  路由键
                channel.queueBind(EMAIL_QUEUE_NAME,EXCHANGE_NAME,"");
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        System.out.println("邮件消费者读取消息:"+msg);
                        long deliveryTag = envelope.getDeliveryTag();
                        channel.basicAck(deliveryTag,false);
                    }
                };
                channel.basicConsume(EMAIL_QUEUE_NAME,false,defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    
    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 14:16
     * @Description:  短信消费者
     */
    public class SmsConsumer {
        private static final String SMS_QUEUE_NAME = "peng_sms_queue";
        private static final String EXCHANGE_NAME = "peng_fanout";
    
        public static void main(String[] args) {
            try {
                Connection conn = RabbitConnUtil.getConn();
                Channel channel = conn.createChannel();
                channel.queueDeclare(SMS_QUEUE_NAME,false,false,false,null);
    //            消费者绑定交换机
                channel.queueBind(SMS_QUEUE_NAME,EXCHANGE_NAME,"");
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        System.out.println("短信消费者读取消息:"+msg);
                    }
                };
                channel.basicConsume(SMS_QUEUE_NAME,true,defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    

    二、主题交换机

    主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列
    1、RabbitMq工具连接类

    package com.heyu.rabbitmq.demo.fanout;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 11:24
     * @Description:
     */
    public class RabbitConnUtil {
        public static Connection getConn() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setUsername("heyu");
            connectionFactory.setPassword("heyu123");
            connectionFactory.setVirtualHost("heyu");
            connectionFactory.setPort(5672);
            return connectionFactory.newConnection();
    
        }
    }
    
    

    2、生产者

    package com.heyu.rabbitmq.demo.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: HEYU
     * @date: 2020/12/24 14:15
     * @Description: 发布订阅
     */
    public class TopicProducer {
        private static final String EXCHANGE_NAME = "peng_topic_fanout";
    
        public static void main(String[] args) {
    
            try {
                Connection conn = RabbitConnUtil.getConn();
    //            创建通道
                Channel channel = conn.createChannel();
    //            绑定交换机 扇形交换机
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                String MSG = "routingKey 消息转发  测试";
                channel.basicPublish(EXCHANGE_NAME, "log", null, MSG.getBytes(StandardCharsets.UTF_8));
                channel.close();
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    3、消费者

    package com.heyu.rabbitmq.demo.topic;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: Melody
     * @date: 2020/12/24 15:21
     * @Description: 邮件消费者
     */
    public class EmailConsumer {
        private static final String EMAIL_QUEUE_NAME = "topic_email_queue";
        private static final String EXCHANGE_NAME = "peng_topic_fanout";
    
        public static void main(String[] args) {
            try {
                Connection conn = RabbitConnUtil.getConn();
                Channel channel = conn.createChannel();
                channel.queueDeclare(EMAIL_QUEUE_NAME,false,false,false,null);
    //            消费者绑定交换机 主题队列名称   交换机名称  路由键
                channel.queueBind(EMAIL_QUEUE_NAME,EXCHANGE_NAME,"log");
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        System.out.println("邮件消费者读取消息:"+msg);
                        long deliveryTag = envelope.getDeliveryTag();
                        channel.basicAck(deliveryTag,false);
                    }
                };
                channel.basicConsume(EMAIL_QUEUE_NAME,false,defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    
    package com.heyu.rabbitmq.demo.topic;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author: Melody
     * @date: 2020/12/24 14:16
     * @Description:  消费者
     */
    public class SmsConsumer {
        private static final String SMS_QUEUE_NAME = "topic_sms_queue";
        private static final String EXCHANGE_NAME = "peng_topic_fanout";
    
        public static void main(String[] args) {
            try {
                Connection conn = RabbitConnUtil.getConn();
                Channel channel = conn.createChannel();
                channel.queueDeclare(SMS_QUEUE_NAME,false,false,false,null);
    //            消费者绑定交换机
                channel.queueBind(SMS_QUEUE_NAME,EXCHANGE_NAME,"log");
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        System.out.println("短信消费者读取消息:"+msg);
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                };
                channel.basicConsume(SMS_QUEUE_NAME,false,defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    
  • 相关阅读:
    解决页面报错: GEThttp://localhost:8080/favicon.ico 404 (Not Found)
    vs2019快捷键整理
    js保存图片至本地
    PHP输出方式的区别
    js运算精度问题
    Hadoop相关问题
    2015年10月5日 12:49:07
    Hive数据倾斜
    Hadoop错误日志
    使用maven开发MR
  • 原文地址:https://www.cnblogs.com/pengcool/p/15610886.html
Copyright © 2011-2022 走看看