zoukankan      html  css  js  c++  java
  • Spring与多线程

    前言背景

    在做新项目,作为中间件的项目,主要做数据服务。这次想把项目做的简洁一些,之前用的什么ActiveMq等中间件产品,这次全部不用,能自己实现就自己实现。自己用BlockingQueue阻塞队列,按照自己的数据量,1G内存也能存上两千多万数据。设计上,需要一个线程去阻塞队列中拿数据,必须是系统启动的时候就去取。没有则阻塞,直到有数据来。
    首先一个问题是,在spring项目中,自定义的New对象和线程,是不受spring管理的。所以在以前的处理中,经常是写一个单例,获取ApplicationContext,这种方式是可行的。但是在用注解的时候,使用这种方式,看上去会比较丑陋。所以,我们还是希望,在使用线程的时候,可以 像以前那样,可以自动的注入我们需要的bean。

    开始动手

    还是用实际的项目来举个栗子。
    作为数据服务,入口是一个webservices的接口,这里因为历史原因,还是采用soap方式。
    入口如下:

          @WebMethod
            public FeedResult send(NocPacket nocPacket) {
                    logger.info("Receive data");
                    QueueManager.getInstance().put(nocPacket,Const.nocQueue);
                    return "successful";
            }
    

    这里采用生产者和消费者的模式,其中的Const.nocQueue,是我们的BlockingQueue,作为数据缓冲区。
    既然如此,还应该有一个消费者。
    之前说过,我们要在系统启动的时候,就去queue中取我们的数据,有则拿出来,作为业务处理,没有则阻塞,等待数据到来。同时,拿到数据后,将这些数据入库。
    这里需要两个类,一个是随系统一起启动的,暂且叫守护程序吧。另一个是做主业务的。
    守护程序代码:

    /**
     * Created by Wl on 15-6-17.
     * 业务消费者
     */
    
    @Component
    public class Business  {
            private Logger logger = Logger.getLogger(Business.class);
    
            @Autowired
            private FlowDataService<FlowData> flowDataService;
    
            public void doBusi() {
                    logger.info("业务线程开始");
                    ExecutorService executor = Executors.newCachedThreadPool();
                    BusiTask task = new BusiTask(Const.nocQueue,flowDataService);
                    executor.submit(task);
                    FlowData data = flowDataService.queryById(1);
                    logger.info("Get Data :"+data.getPlateNo());
            }
    }
    

    处理业务的类:

    /**
     * Created by Wl on 15-6-17.
     * 业务处理主方法
     */
    public class BusiTask implements Callable<BlockingQueue<NocPacket>> {
            private Logger logger = Logger.getLogger(BusiTask.class);
            private BlockingQueue<NocPacket> queue;
            private FlowDataService<FlowData> flowDataService;
    
            public BusiTask(BlockingQueue<NocPacket> queue,FlowDataService<FlowData> flowDataService) {
                    this.queue = queue;
                    this.flowDataService = flowDataService;
            }
    
            public BlockingQueue<NocPacket> call() throws Exception {
                    NocPacket nocPacket = queue.take();
                    logger.info("Data:"+nocPacket.getGWName());
                    Payload sc = NocUtil.getPayload(nocPacket);
                    FlowData data = new FlowData();
                    flowDataService.add(data);
                    logger.info("成功插入数据");
                    return null;
            }
    }
    

    此处,守护程序是随系统一起启动的,这一点可以用spring配置。这个守护程序不能有特殊性,它不能是我们自建的一个线程,也不能new一个方法,再去调用doBusi,因为这样就不受spring管理了。它的第一个目的是拿到注入的service。
    这两个类,采用Future和Callable的方式进行异步线程处理的。如果直接去处理BusiTask中的任务,就会阻塞到queue的take上。同时,通过守护程序,BusiTask也可以拿到services,进行入库等操作。

    结尾

    spring是可以进行多线程开发的,目前没使用过,以后有时间好好研究下。

  • 相关阅读:
    k8s使用私有镜像仓库
    spark client 配置lzo
    jvm系列(四):jvm调优-命令篇
    mysqldump 备份还原数据库
    df 卡死及ls无法查看文件
    记录一次服务器断电,直接进入救援模式
    nginx开机自启脚本
    mongodb启动关闭脚本
    mongo数据备份恢复
    centos 快速配置网络
  • 原文地址:https://www.cnblogs.com/juepei/p/4583952.html
Copyright © 2011-2022 走看看