zoukankan      html  css  js  c++  java
  • ssm+RabbitMQ 整合

    1. 配置说明

    1.配置rabbitmq.properties

    rmq.ip=192.168.5.109
    rmq.port=5672
    rmq.producer.num=20
    rmq.manager.user=test
    rmq.manager.password=123456

    2.使用注解配置RabbitMqConfiguration类,注入spring容器

    @Component
    public class RabbitMqConfiguration {
        
        private static Properties props;
        
        private static String ip;
        private static String port;
        private static String username;
        private static String password;
    
        private static final String CONF_NAME = "rabbitmq.properties";
    
        static {
            props = new Properties();
            try {
                String path = Tools.getRootPlugInPath() + CONF_NAME;
                System.out.println(path);
                props.load(new InputStreamReader(new FileInputStream(path), "UTF-8"));
                ip = props.getProperty("rmq.ip");
                port = props.getProperty("rmq.port");
                username = props.getProperty("rmq.manager.user");
                password = props.getProperty("rmq.manager.password");
            } catch (IOException e) {
                System.out.println("配置文件读取异常");
            }
        }
        
        @Bean
        public AmqpTemplate rabbitTemplate() {
            System.out.println("#############################-------RabbitMq Start--------################################");
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            RetryTemplate retryTemplate = new RetryTemplate();
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(500);
            backOffPolicy.setMultiplier(10.0);
            backOffPolicy.setMaxInterval(30000);
            retryTemplate.setBackOffPolicy(backOffPolicy);
            template.setRetryTemplate(retryTemplate);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            System.out.println("#############################-------RabbitMq End--------################################");
            return template;
        }
        
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(ip);
            connectionFactory.setPort(Integer.valueOf(port));
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            System.out.println(connectionFactory);
            return connectionFactory;
        }
    }

    3.工具类RabbitmqService,写了发送消息、接收消息的公用方法

    @Service
    public class RabbitmqService {
        
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        @Resource
        private AmqpTemplate amqpTemplate;
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        
        public boolean sendMsg(String exchangeName,String queueName, Object object) {
            try {
                amqpTemplate.convertAndSend(exchangeName,queueName, object);
            } catch (Exception e) {
                logger.error(e.getMessage());
            }
            return true;
        }
    
        public String reciveMsg(String queueName) {
    
            try {
                Message ms = amqpTemplate.receive(queueName);
                return ms.toString();
            } catch (Exception e) {
                logger.error(e.getMessage());
            }
            return "";
        }
    
        public int getCount(String exchangeName,String queueName){
            ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
            // 创建连接
            Connection connection = connectionFactory.createConnection();
            // 创建通道
            Channel channel = connection.createChannel(false);
            // 设置消息交换机
            try {
                channel.exchangeDeclare(exchangeName, "direct", true, false, null);
            } catch (IOException e) {
                e.printStackTrace();
            }
            AMQP.Queue.DeclareOk declareOk = null;
            try {
                declareOk = channel.queueDeclarePassive(queueName);
            } catch (IOException e) {
                e.printStackTrace();
            }
            //获取队列中的消息个数
            int queueCount = declareOk.getMessageCount();
    
            // 关闭通道和连接
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            connection.close();
    
            return queueCount;
    
        }
        
    }

    2. 发送消息调用(生产者)

    @Resource
    private RabbitmqService rabbitmqService;
    
    rabbitmqService.sendMsg("writeSYLog-exchange","writeSYLog", message);

    在代码中注入RabbitmqService ,调用sendMsg方法,其中

    参数1 为交换机名称

    参数2为 队列名称

    参数3为要发送的消息

    3. 接收消息调用(消费者)

    ① 第一种:同步接收消息

     @Resource
    private RabbitmqService rabbitmqService;

    在代码中注入RabbitmqService ,分别调用getCount

    reciveMsg方法,获取队列中的消息数量以及接收消息

    其中:

    getCount()方法(默认使用了direct类型) 根据交换机名称和队列名称查询队列中所有的待消费消息

    参数1 为交换机名称

    参数2为 队列名称

    reciveMsg()方法,根据队列名消费消息

    参数 为队列名称

    @Resource
    private RabbitmqService rabbitmqService;
    
    @RequestMapping("testrecive")
        public void testrecive() {
            int count = rabbitmqService.getCount("writeSYLog-exchange","writeSYLog");
            for (int i =0;i<count;i++){
                String resultMessage = rabbitmqService.reciveMsg("writeSYLog");
                System.out.println(resultMessage);
            }
    
        }

    ② 第二种:异步接收消息

    使用监听器来异步接收消息,配置spring-rabbitmq.xml

    <!-- 配置rabbitMQ服务基本信息 -->
    
    <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}" username="${rmq.manager.user}" password="${rmq.manager.password}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    
     
    
    <!-- 以下配置可配置多个 -->
    <!-- 配置rabbitMQ监听器类 -->
    <bean id="queueListenter" class="com.censoft.cends.listener.DirectReceiver"></bean>
    <!-- 配置监听器监听的队列名 -->
     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
       <rabbit:listener queue-names="writeSYLog" ref="queueListenter" />
    </rabbit:listener-container>

       创建com.censoft.cends.listener.DirectReceiver类继承MessageListener类,重写onMessage方法,获取消息进行消费

    public class DirectReceiver implements MessageListener {
        @Override
        public void onMessage(Message msg) {
            String str = "";
            try {
                str = new String(msg.getBody(), "UTF-8");
                logger.info("==获取消息==" + str);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
  • 相关阅读:
    怎样在过滤器中读取配置信息?
    怎样将直接数据库中Json字段,映射到Mybatis中的Map类型?
    spring/boot 打包,资源/配置/业务文件分离
    使用VS Code推送代码到GitHub
    Clion下jni配置
    curl post请求总是返回417错误
    ubuntu 12.10 apt-get 源
    如何让git小乌龟工具TortoiseGit记住你的账号密码
    FastCgi与Cgi
    Libevent核心原理
  • 原文地址:https://www.cnblogs.com/winddogg/p/14113739.html
Copyright © 2011-2022 走看看