zoukankan      html  css  js  c++  java
  • Semaphore、CountDownLatch和Atomic结合使用的典型“保持有序加速消费”例子

    不多说,直接上代码(方便以后copy):

    package com.meerkat.logic.dataAnalysis;
    
    import com.meerkat.foundation.api.ImsiRecordSearchApi;
    import com.meerkat.pojo.po.foundation.imsi.ImsiRecord;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.stream.Collectors;
    
    /**
     * @author zz
     * @version 1.0
     * @date 2021/1/8 11:30
     */
    @Slf4j
    @Component
    public class TempDemo {
        @Autowired
        private ImsiRecordSearchApi imsiRecordSearchApi;
    
        private ExecutorService debutPool = Executors.newSingleThreadExecutor();
    
        private ExecutorService comparePool = Executors.newFixedThreadPool(20);
    
        private Semaphore semaphore = new Semaphore(20);
    
        public void asynExeDebutResultJob(String taskId, boolean needDeleteBefore) {
            debutPool.execute(new Runnable() {
                @Override
                public void run() {
    
                    //大概1.4亿条数据
                    List<ImsiRecord> analysisTotal = imsiRecordSearchApi.search(null, null);
    
                    //分组后有几千万条imsi
                    Map<String, List<ImsiRecord>> map = analysisTotal.stream().collect(Collectors.groupingBy(ImsiRecord::getImsi));
    
                    CountDownLatch cdl = new CountDownLatch(map.keySet().size());
                    //保存所有首次出现的imsi
                    AtomicReference<List<String>> firstImsiList = new AtomicReference<>(new CopyOnWriteArrayList<>());
    
                    for (String imsi : map.keySet()) {
                        semaphore.acquireUninterruptibly();
                        comparePool.execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    //判断是否是首次出现的imsi
                                    boolean exists = imsiRecordSearchApi.catchInTimeQuantum(imsi, null);
                                    if (!exists) {
                                        firstImsiList.get().add(imsi);
                                    }
                                } finally {
                                    semaphore.release();
                                    cdl.countDown();
                                }
                            }
                        });
                    }
                    try {
                        cdl.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                    //后面要用到firstImsiList,一定要先用final取出来
                    final List<String> imsiList = firstImsiList.getAndSet(new CopyOnWriteArrayList<>());
                }
            });
    
        }
    }
  • 相关阅读:
    购物菜单
    增删改查
    第七次Android
    第七次作业
    第四次作业
    第二次作业
    第七次
    第二次作业
    第三次作业
    第六周安卓作业
  • 原文地址:https://www.cnblogs.com/zz-3m23d-begining/p/14250686.html
Copyright © 2011-2022 走看看