zoukankan      html  css  js  c++  java
  • JAVA使用多线程进行数据处理

    import org.apache.commons.collections.CollectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * 以下是伪代码,要根据自己的实际逻辑进行整合
     */
    @Service
    public class PushProcessServiceImpl implements PushProcessService {
    
    
        private final static Logger logger = LoggerFactory.getLogger(PushProcessServiceImpl.class);
    
        /**
         *每个线程更新的条数
         * 这表示每次执行五千条数据的推送操作
         */
        private static final Integer LIMIT = 5000;
    
        /**
         * 起的线程数
         */
        private static final Integer THREAD_NUM = 5;
    
        /**
         * 创建线程池
         *
         * -  corePoolSize:线程核心参数选择了5
         *
         * - maximumPoolSize:最大线程数选择了核心线程数2倍数
         *
         * - keepAliveTime:非核心闲置线程存活时间直接置为0
         *
         * - unit:非核心线程保持存活的时间选择了 TimeUnit.SECONDS 秒
         *
         * - workQueue:线程池等待队列,使用 容量初始为100的 LinkedBlockingQueue阻塞队列
         *
         * 线程池拒绝策略,采用了默认AbortPolicy:直接丢弃任务,抛出异常。
         *
         */
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
    
    
        /**
         * 执行推送任务
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public void pushData() throws ExecutionException, InterruptedException {
            //计数器,需要保证线程安全
            int count = 0;
    
            //这里从数据库查询出要推送数据总数,根据自己实际的来
            Integer total = pushProcessMapper.getCountByState(0);
    
    
            logger.info("未推送数据条数:{}", total);
            //计算需要循环多少轮
            int num = total / (LIMIT * THREAD_NUM) + 1;
            logger.info("要经过的轮数:{}", num);
    
            //统计总共推送成功的数据条数
            int totalSuccessCount = 0;
            for (int i = 0; i < num; i++) {
                //使用集合来接收线程的运行结果,防止阻塞,接收线程返回结果
                List<Future<Integer>> futureList = new ArrayList<>(32);
    
                //起THREAD_NUM个线程并行查询更新库,加锁
                for (int j = 0; j < THREAD_NUM; j++) {
                    //使用 synchronized 来保证线程安全,保证计数器的增加是有序的
                    synchronized (PushProcessServiceImpl.class) {
                        int start = count * LIMIT;
                        count++;
                        /**
                         * 提交线程,用数据起始位置标识线程
                         * 这里前两个参数start和limit参数相当于执行sql
                         *  limit start,limit
                         *
                         */
    
                        Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));
                        //先不取值,防止阻塞,放进集合
                        futureList.add(future);
                    }
                }
                //统计本轮推送成功数据
                for (Future f : futureList) {
                    totalSuccessCount = totalSuccessCount + (int) f.get();
                }
            }
            //把数据库的推送标识更新为已推送(已推送!=推送成功),可以根据自己实际的来
            pushProcessMapper.updateAllState(1);
    
            logger.info("推送数据完成,需推送数据:{},推送成功:{}", total, totalSuccessCount);
        }
    
        /**
         * 推送数据线程类
         */
        class PushDataTask implements Callable<Integer> {
            int start;
            int limit;
            int threadNo;   //线程编号
    
            PushDataTask(int start, int limit, int threadNo) {
                this.start = start;
                this.limit = limit;
                this.threadNo = threadNo;
            }
    
            @Override
            public Integer call() throws Exception {
                int count = 0;
                //分页查询每次执行的推送的数据,查询数据
                List<PushProcess> pushProcessList = pushProcessMapper.findPushRecordsByStateLimit(0, start, limit);
                if (CollectionUtils.isEmpty(pushProcessList)) {
                    return count;
                }
                logger.info("线程{}开始推送数据", threadNo);
    
                /**
                 * 遍历需要更新的数据实体类
                 */
                for (PushProcess process : pushProcessList) {
                    //这里是执行推送请求,根据自己实际的来,也可以要处理的任务
                    boolean isSuccess = pushUtil.sendRecord(process);
    
                    //根据主键更新推送是否成功状态标识
                    if (isSuccess) {
                        //推送成功
                        pushProcessMapper.updateFlagById(process.getId(), 1);
                        count++;
                    } else {
                        //推送失败
                        pushProcessMapper.updateFlagById(process.getId(), 2);
                    }
                }
                logger.info("线程{}推送成功{}条", threadNo, count);
                return count;
            }
        }
    }
    -----------------------有任何问题可以在评论区评论,也可以私信我,我看到的话会进行回复,欢迎大家指教------------------------ (蓝奏云官网有些地址失效了,需要把请求地址lanzous改成lanzoux才可以)
  • 相关阅读:
    [leetcode]34.Find First and Last Position of Element in Sorted Array找区间
    [leetcode]278. First Bad Version首个坏版本
    [leetcode]367. Valid Perfect Square验证完全平方数
    [leetcode]45. Jump Game II青蛙跳(跳到终点最小步数)
    [leetcode]55. Jump Game青蛙跳(能否跳到终点)
    [leetcode]26. Remove Duplicates from Sorted Array有序数组去重(单个元素只出现一次)
    [leetcode]27. Remove Element删除元素
    [leetcode]20. Valid Parentheses有效括号序列
    [leetcode]15. 3Sum三数之和
    C#中的局部类型
  • 原文地址:https://www.cnblogs.com/pxblog/p/14499149.html
Copyright © 2011-2022 走看看