zoukankan      html  css  js  c++  java
  • 八.利用springAMQP实现异步消息队列的日志管理

      经过前段时间的学习和铺垫,已经对spring amqp有了大概的了解。俗话说学以致用,今天就利用springAMQP来完成一个日志管理模块。大概的需求是这样的:系统中有很多地方需要记录操作日志,比如登录、退出、查询等,如果将记录日志这个操作掺杂在主要的业务逻辑当中,势必会增加响应的时间,对客户来说是一种不好的体验。所以想到用异步消息队列来进行优化。系统处理完主要业务逻辑之后,将日志的相关实体发布到特定Queue下,然后设置一个监听器,监该Queue的消息并做处理。客户不用等待日志的处理就可直接返回。

      大概的业务流程如下图所示。

      

      1.首先建立日志的数据表和实体,数据表起名为t_log。实体如下。主要包含操作者,操作的事件,操作时间等几个主要参数。  

    package com.xdx.entity;
    
    import java.util.Date;
    
    public class TLog {
        private Integer logId;
    
        private String operator;
    
        private String event;
    
        private Date createTime;
    
        private Integer isDel;
    
        public TLog(String operator, String event) {
            this.operator = operator;
            this.event = event;
        }
    
        public TLog() {
        }
    
        public Integer getLogId() {
            return logId;
        }
    
        public void setLogId(Integer logId) {
            this.logId = logId;
        }
    
        public String getOperator() {
            return operator;
        }
    
        public void setOperator(String operator) {
            this.operator = operator == null ? null : operator.trim();
        }
    
        public String getEvent() {
            return event;
        }
    
        public void setEvent(String event) {
            this.event = event == null ? null : event.trim();
        }
    
        public Date getCreateTime() {
            return createTime;
        }
    
        public void setCreateTime(Date createTime) {
            this.createTime = createTime;
        }
    
        public Integer getIsDel() {
            return isDel;
        }
    
        public void setIsDel(Integer isDel) {
            this.isDel = isDel;
        }
    }

      2.编写保存日志的方法,很简单,就是一个数据库的save过程。

      

    package com.xdx.service;
    
    import javax.annotation.Resource;
    
    import org.springframework.stereotype.Service;
    
    import com.xdx.dao.BaseDao;
    import com.xdx.entity.TLog;
    
    @Service
    public class LogService {
        @Resource(name = "baseDao")
        private BaseDao<TLog, Integer> baseDao;
    
        public Integer saveLog(TLog log) {
            Integer result = baseDao.addT("TLogMapper.insertSelective", log);
            return result;
        }
    }

      其中的TLogMapper.insertSelective代码如下:

     <insert id="insertSelective" parameterType="com.xdx.entity.TLog" >
        insert into t_log
        <trim prefix="(" suffix=")" suffixOverrides="," >
          <if test="logId != null" >
            log_id,
          </if>
          <if test="operator != null" >
            operator,
          </if>
          <if test="event != null" >
            event,
          </if>
          <if test="createTime != null" >
            create_time,
          </if>
          <if test="isDel != null" >
            is_del,
          </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides="," >
          <if test="logId != null" >
            #{logId,jdbcType=INTEGER},
          </if>
          <if test="operator != null" >
            #{operator,jdbcType=VARCHAR},
          </if>
          <if test="event != null" >
            #{event,jdbcType=VARCHAR},
          </if>
          <if test="createTime != null" >
            #{createTime,jdbcType=TIMESTAMP},
          </if>
          <if test="isDel != null" >
            #{isDel,jdbcType=INTEGER},
          </if>
        </trim>
      </insert>

      3.接下来就跟我们的spring amqp有关了,首先要在pom.xml中引入相关的jar包。 

        <!-- spring-rabbitMQ -->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>2.0.1.RELEASE</version>
            </dependency>
            <!-- spring -amqp -->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-amqp</artifactId>
                <version>2.0.1.RELEASE</version>
            </dependency>

      4.编写主配置文件。在项目新建一个com.xdx.spring_rabbit包。关于rabbit的所有代码都写在这边。

      编写一个抽象的rabbit的主配置文件,之所以这样做是为了以后扩展方便,让不同的异步消息队列的业务可以继承并扩展它。如下所示。

      主配置的文件主要是配置了连接Rabbit服务的基本信息,并且指定了消息转换器是json转换器。

    package com.xdx.spring_rabbit;
    
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    
    /**
     * 抽象类,rabbitMQ的主配置类
     * 
     * @author xdx
     *
     */
    public abstract class AbstractRabbitConfiguration {
    
        @Value("${amqp.port:5672}")
        private int port = 5672;
    
        protected abstract void configureRabbitTemplate(RabbitTemplate template);
    
        /**
         * 由于connectionFactory会与项目中的redis的connectionFactory命名冲突,
         * 所以这边改名为rabbit_connectionFactory
         * 
         * @return
         */
        @Bean
        public ConnectionFactory rabbit_connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                    "192.168.1.195");
            connectionFactory.setUsername("xdx");
            connectionFactory.setPassword("xxxx");
            connectionFactory.setPort(port);
            return connectionFactory;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(rabbit_connectionFactory());
            template.setMessageConverter(jsonMessageConverter());
            configureRabbitTemplate(template);
            return template;
        }
    
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        public AmqpAdmin amqpAdmin() {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbit_connectionFactory());
            return rabbitAdmin;
        }
    }

      5.编写我们这个日志项目需要用到的配置文件,继承上述的抽象类,在该配置文件中,我们具体指定Exchange,RouteKey,Queue,Binding以及监听器这些要素。

    package com.xdx.spring_rabbit;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 日志管理的Rabbit配置类的具体实现类
     * 
     * @author xdx
     *
     */
    @Configuration
    public class LogRabbitConfiguration extends AbstractRabbitConfiguration {
        protected static String LOG_EXCHANGE_NAME = "warrior.exchange.log";// topic
                                                                            // exchange的名称
        protected static String LOG_QUEUE_NAME = "warrior.queue.log";// 接收消息的queue
        protected static String LOG_ROUTING_KEY = LOG_QUEUE_NAME;
        @Autowired
        private LogRabbitRecHandler logRabbitRecHandler;// 监听器的委托类,委托其处理接收到的消息
    
        /**
         * 设置Exchange为LOG_EXCHANGE_NAME,RoutingKey为LOG_ROUTING_KEY,这样将信息发送到
         * Exchange为LOG_EXCHANGE_NAME,RouteKey为LOG_ROUTING_KEY的通道中
         */
        @Override
        protected void configureRabbitTemplate(RabbitTemplate template) {
            System.err.println("创建一个RabbitTemplate,名字是 " + template);
            template.setExchange(LOG_EXCHANGE_NAME);
            template.setRoutingKey(LOG_ROUTING_KEY);
        }
    
        /**
         * 用于接收日志消息的Queue,默认绑定自己的名称
         * 
         * @return
         */
        @Bean
        public Queue logQueue() {
            return new Queue(LOG_QUEUE_NAME);
        }
    
        /**
         * 定义一个topExchange
         * 
         * @return
         */
        @Bean
        public TopicExchange logExchange() {
            return new TopicExchange(LOG_EXCHANGE_NAME);
        }
    
        /**
         * 定义一个绑定日志接收的Queue的binding
         * 
         * @return
         */
        @Bean
        public Binding logQueueBinding() {
            return BindingBuilder.bind(logQueue()).to(logExchange())
                    .with(LOG_ROUTING_KEY);
        }
    
        /**
         * 这个bean为监听适配器,用于日志消息,并交由logRabbitRecHandler处理
         * 
         * @return
         */
        @Bean
        public MessageListenerAdapter messageListenerAdapter() {
            return new MessageListenerAdapter(logRabbitRecHandler,
                    jsonMessageConverter());
        }
    
        /**
         * 这个bean用于监听服务端发过来的消息,监听的Queue为logQueue(),
         * 因为该Queue绑定了logExchange和logRouteKey, 所以它可以接收到我们发送的日志消息
         * @return
         */
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
                    rabbit_connectionFactory());
            container.setConcurrentConsumers(5);
            container.setQueues(logQueue());
            container.setMessageListener(messageListenerAdapter());
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            return container;
        }
    }

      6.封装发送消息的接口,如下所示。这是一个泛型的接口,目的是为了传入不同的消息类型。

    package com.xdx.spring_rabbit;
    /**
     * 定义一个泛型接口,用于发送消息,T为要发送的消息类型
     * @author xdx
     *
     * @param <T>
     */
    public interface RabbitSend<T> {
        void send(T t);
    }

      7.实现这个发送消息的接口。在这个实现类中,我们注入了之前生成的RabbitTemplate对象。用于发送消息。

    package com.xdx.spring_rabbit;
    
    import javax.annotation.Resource;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    import com.xdx.entity.TLog;
    
    /**
     * 用于发送日志消息的通用实现类
     * 
     * @author xdx
     *
     */
    @Component("logRabbitSend")
    public class LogRabbitSend implements RabbitSend<TLog> {
        @Resource(name = "rabbitTemplate")
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void send(TLog log) {
            rabbitTemplate.convertAndSend(log);
            System.err.println("发送消息:" + log);
        }
    }

      8.封装监听器委托对象的接口,该接口用于处理监听器监听到的消息。同意是一个泛型的类,如下所示。

    package com.xdx.spring_rabbit;
    
    /**
     * 用于处理监听到的消息的消息处理器接口,T为接收到的消息的类型
     * 
     * @author xdx
     *
     * @param <T>
     */
    public interface RabbitRecHandler<T> {
        void handleMessage(T t);
    }

      9.实现上述委托对象的接口,如下所示。在该接口中,我们注入了日志处理类的对象。用于储存日志信息到数据库。

    package com.xdx.spring_rabbit;
    
    import javax.annotation.Resource;
    
    import org.springframework.stereotype.Component;
    
    import com.xdx.entity.TLog;
    import com.xdx.service.LogService;
    @Component("logRabbitRecHandler")
    public class LogRabbitRecHandler implements RabbitRecHandler<TLog> {
        @Resource(name="logService")
        private LogService logService;
    
        @Override
        public void handleMessage(TLog log) {
            System.err.println("开始存储日志"+log.getOperator()+","+log.getEvent());
            logService.saveLog(log);
        }
    }

      10.最后,我们在具体的业务类中调用消息发送的接口,就可以实现日志消息的发送了。如下所示。

    @Controller
    public class AdminController {
        @Resource(name = "logRabbitSend")
        private LogRabbitSend logRabbitSend;
        
        @RequestMapping("admin")
        public ModelAndView admin(HttpSession session,String adminName, String password) throws Exception {
            List<Map<String,Object>>adminMap=adminService.getAllAdminMap();
            ModelAndView mv = new ModelAndView();
            //登录操作的主要逻辑代码……
            session.setAttribute("adminName", admin.getAdminName());
            session.setAttribute("realName", admin.getRealName());
            TLog log=new TLog(adminName, "登录系统");
            logRabbitSend.send(log);
            return mv;
        }
    }

      运行我们的系统,我们先看看RabbitMQ的后台。看到了我们定义的Exchange和Queue等元素。

     

      运行AdmintController类中的admin方法,登录系统,我们发现确实已经发送了消息,并且消息被监听到,然后存储到了数据库。

      控制台打印出来的消息为:

      数据库存入的记录为:

  • 相关阅读:
    开放就像死亡访问之后就能回头——Leo鉴书84
    将博客搬至CSDN
    将博客搬至CSDN
    滚动条
    Perl Pack写的一个数据报表程序
    利用hash 数组打印标题
    Linux显示只显示目录文件
    Linux显示按文件名降序文件
    Linux显示以时间生升序显示文件
    Linux显示按文件大小降序排列
  • 原文地址:https://www.cnblogs.com/roy-blog/p/8125049.html
Copyright © 2011-2022 走看看