zoukankan      html  css  js  c++  java
  • Mysql多线程插入10万数据

    从excel导入十万数据到mysql表中

    1.excel中导入数据到mongo暂存

     @ResponseBody
    @PostMapping("importTaskV2")
    @ApiOperation("导入任务V2")
    public DggRestResponse importByExcel(@Param("file") MultipartFile file) {
    XSSFWorkbook xssfwk = null;
    try {
    xssfwk = new XSSFWorkbook(file.getInputStream());
    } catch (IOException e) {
    log.error("导入发生异常!");
    throw new ApplicationException("导入发生异常!");
    }
    List<List<String>> result = new ArrayList<List<String>>();
    int size = xssfwk.getNumberOfSheets();
    XSSFRow nameRow = xssfwk.getSheetAt(0).getRow(0);
    ArrayList<ImportTask> tasks = new ArrayList<>();
    for(int numSheet=0;numSheet<size;numSheet++){
    XSSFSheet xssfSheet = ((XSSFWorkbook) xssfwk).getSheetAt(numSheet);
    for(int rowNum = 1;rowNum <=xssfSheet.getLastRowNum();rowNum++){
    XSSFRow xssfRow = xssfSheet.getRow(rowNum);
    int minColIx = xssfRow.getFirstCellNum();
    int maxColIx = xssfRow.getLastCellNum();
    List<String> rowList = new ArrayList<>();

    BasicDBObject document = new BasicDBObject();
    System.out.println("****"+rowNum+".....");
    ImportTask task = new ImportTask();
    for(int colIx=minColIx;colIx<maxColIx;colIx++){
    XSSFCell cell = xssfRow.getCell(colIx);
    String value = String.valueOf(cell);
    document.put(String.valueOf(nameRow.getCell(colIx)),value);
    // System.out.println("插入+"+String.valueOf(nameRow.getCell(colIx))+":"+value);
    if (colIx==minColIx){
    task.setId(DggKeyWorker.nextId());
    task.setType(0);
    task.setCustomerName(value);
    }if (colIx==1){
    task.setCoNum(value);

    }
    rowList.add(value);
    }
    String coNum = task.getCoNum();
    ArchivesCustomer customer = waitingStorageMapper.findCustomerByCoNum(coNum);
    if (customer==null){
    tasks.add(task);
    }
    result.add(rowList);
    }
    }
    mongoTemplate.insertAll(tasks);
    return getSuccessResponse(""); }

    1.从mongo读取数据,并存入mysql中

    (1):在启动类上,先初始化线程池大小

    @SpringBootApplication
    @ImportResource(locations = {"classpath:spring/spring-context.xml"})
    @MapperScan(basePackages = "net.dgg.iboss.models.**.mapper")
    public class TestApplication {
    public static void main(String[] args) {
    SpringApplication.run(TestApplication.class, args);
    }
    /**
    * @Description 公用线程池
    **/
    @Bean
    public Executor executor() {
    ThreadPoolExecutor executor =
    new ThreadPoolExecutor(
    20,
    20,
    0L,
    TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(500),
    new CustomerThreadFactory("CustomerThread"));
    return executor;
    }
    }

    (2):准备号插入数据的线程

    @Slf4j
    public class ImportDataThread implements Runnable {
        //加锁防止重复导入
    private CountDownLatch countDownLatch;

    private MongoTemplate mongoTemplate;
        //需要导入的数据
    private List<ImportTask> toImportTasks;
        //多线程中安全的Long类型
    private AtomicLong failTotal;

    private AtomicLong successTotal;

      private InsertService insertService;
    public ImportDataThread(CountDownLatch countDownLatch, MongoTemplate mongoTemplate, List<ImportTask> toImportTasks, AtomicLong successTotal, AtomicLong failTotal,InsertService insertService
    ){
    this.countDownLatch = countDownLatch;
    this.mongoTemplate = mongoTemplate;
    this.toImportTasks = toImportTasks;
    this.failTotal = failTotal;
    this.successTotal = successTotal;
         this.insertService=insertService;

    }

    @Override
    public void run() {
    for (int i = 0; i < toImportTasks.size(); i++) {
    ImportTask toImportTask = toImportTasks.get(i);
    //封装需要导入的数据(比如id等其他字段)
         toImportTask.setId(keyWork.nextId());
             toImportTask.setCreatTime("手艺人")
    //开启手动事务
    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
    DataSourceTransactionManager txManagger = applicationContext.getBean( DataSourceTransactionManager.class);
    TransactionStatus status =txManagger.getTransaction(def);
    try {
     插入数据
              insertService.insert()
    txManagger.commit(status);
    successTotal.addAndGet(1);
    }catch (Exception e){
    log.error("导入时发生异常!",e);
    failTotal.addAndGet(1);
    txManagger.rollback(status);
    }
    }
    countDownLatch.countDown();
    }
    }

    (3):插入mysql

    public HashMap<String, Object> importData(Long roomId) {
    HashMap<String, Object> result = new HashMap<>();
    AtomicLong successTotal = new AtomicLong(0);
    AtomicLong failTotal = new AtomicLong(0);
    //查询所有需要导入的任务
    Query query = new Query();
    query.addCriteria(Criteria.where("type").is(0));
    List<ImportTask> allTasks = mongoTemplate.find(query,ImportTask.class);
    if (allTasks.size()>0){
    if (allTasks.size()<=threadNum){
    //当任务数小于线程数时
    CountDownLatch countDownLatch = new CountDownLatch(allTasks.size());
    for (int i = 0; i <allTasks.size() ; i++) {
    List<ImportTask> toImportTasks = new ArrayList<>(1);
    toImportTasks.add(allTasks.get(i));
    executor.execute(new ImportDataThread(countDownLatch,mongoTemplate,toImportTasks,successTotal,failTotal,units));

    }
    try {
    countDownLatch.await();
    } catch (InterruptedException e) {
    log.error("导入发生异常!");
    throw new ApplicationException("导入发生异常!");
    }
    }else{
    //当任务数大于线程数时
    CountDownLatch countDownLatch = new CountDownLatch(threadNum);
    int taskTotal = allTasks.size();
    int threadTotal = (int) Math.floor(taskTotal/threadNum);
    List<ArchivesUnit> allMappingUnits = customersService.importGetUnits(taskTotal,roomId);
    for (int i = 0; i < threadNum; i++) {
    List<ImportTask> toImportTask = new ArrayList<>(threadTotal);
    if ((i+1) == threadNum){
    toImportTask = allTasks.subList(i*threadTotal,taskTotal);
    }else {
    toImportTask = allTasks.subList(i*threadTotal,(i+1)*threadTotal);
    }
    executor.execute(new ImportDataThread(countDownLatch,mongoTemplate,customerDAO,mappingMapper,boxDAO,toImportTask,roomId,successTotal,failTotal,customersService,toImportUnits));
    }
    try {
    countDownLatch.await();
    } catch (InterruptedException e) {
    log.error("导入发生异常!");
    throw new ApplicationException("导入发生异常!");
    }
    }
    }
    result.put("successTotal",successTotal.get());
    result.put("failTotal",failTotal.get());
    return result;
    }
  • 相关阅读:
    作业2 求题目中的数
    2013 C#单元测试
    实现项目WC
    带括号多项式版四则运算
    20道简单加减法随机生成程序扩展版体会
    20道简单加减法随机生成程序
    Jeesite 集成微信支付接口
    第一节:JAVA 语言的学习目标
    vector(未完)
    关于phpstorm端口63342的修改经历
  • 原文地址:https://www.cnblogs.com/lml-home/p/11712003.html
Copyright © 2011-2022 走看看