zoukankan      html  css  js  c++  java
  • Java 线程池 +生产者消费者+MySQL读取300 万条数据

    1.1需求

        数据库300 万条用户数据 ,遍历获取所有用户, 各种组合关联, 获取到一个新的json ,存到redis 上。

    1.2 难点

      数据库比较多, 不可能单线程查询所有的数据到内存。

    1.3解决办法

     多线程读取, 生产者 每次获取200 条数据, 消费者去消费。(这里 主要是根据MySQL分页去获取下一个200 条数据)

    1.4 代码

    1.4.1 调用方法

        /**
         * 线程启动
         */
        public void update() {
    //redis操作类 HashRedisUtil redisUtil
    = HashRedisUtil.getInstance(); //生产者消费者 ProducerConsumer pc = new ProducerConsumer(); //数据仓库 Storage s = pc.new Storage(); ExecutorService service = Executors.newCachedThreadPool(); //一个线程进行查询 Producer p = pc.new Producer(s,userMapper); service.submit(p); System.err.println("生产线程正在生产中。。。。。。。。。"); //是个线程进行修改 for(int i=0;i<10;i++){ System.err.println("消费线程"+i+"正在消费中。。。。。。。。。。"); service.submit(pc.new Consumer( redisUtil,userMapper,s)); } }

    1.4.2 主要核心类

    package com.ypp.thread;
    
    
    import java.math.BigDecimal;
    import java.util.Calendar;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.log4j.Logger;
    import org.joda.time.LocalDateTime;
    
    import com.alibaba.fastjson.JSONObject;
    import com.ypp.constants.Constants;
    import com.ypp.mapper.UserMapper;
    import com.ypp.model.User;
    import com.ypp.model.UserAlis;
    import com.ypp.model.UserBaseModel;
    import com.ypp.model.UserVip;
    import com.ypp.util.HashRedisUtil;
    import com.ypp.util.JsonUtils;
    import com.ypp.util.PHPSerializer;
    
    
    public class ProducerConsumer {
        private static Logger logger = Logger.getLogger(ProducerConsumer.class);
    //这个page 是核心, 全局变量, 当生产者生产一次 ,获取200 个用户, 会把这个page++, 下次获取就是后一个200 条用户了
    private static Integer page = 0; //消费者
    public class Consumer implements Runnable { private HashRedisUtil redisUtil; private UserMapper userMapper; private Storage s = null; public Consumer(HashRedisUtil redisUtil, UserMapper userMapper, Storage s) { super(); this.redisUtil = redisUtil; this.userMapper = userMapper; this.s = s; } public void run() { try { while (true) { User users = s.pop(); long bbb = System.currentTimeMillis(); // 获取一个用户的粉丝列表 并存到redis try { fansUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e1) { e1.printStackTrace(); } // 获取一个用户的关注列表, 并存到redis try { followUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e) { e.printStackTrace(); } // 获取一个用户的黑名单, 并存到redis try { blackUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e) { e.printStackTrace(); } // 用户基本信息 try { userbaseUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e) { e.printStackTrace(); } long ccc = System.currentTimeMillis(); System.out.println("用户:" + users.getToken() + " 全部总共耗时:" + (ccc - bbb) + "毫秒"); Thread.sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } public List<User> getUserInfo(Integer iThread) { return userMapper.findUserInfo((iThread - 1) * 200 + 1); } /** * 用户基本信息修改 * * @param token * @param myuserId * @param redisUtil * @throws Exception */ private void userbaseUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /** * 更新一个用户的黑名单(原来的token改成userID) * * @param token * @param string * @param redisUtil * @throws Exception */ private void blackUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /** * 获取一个用户的关注 * * @param token * @param string * @param redisUtil * @throws Exception */ private void followUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /** * 获取一个用户的粉丝列表 * * @param token * @param userId * @param redisUtil * @throws Exception */ private void fansUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } //生产者 public class Producer implements Runnable { private Storage s = null; private UserMapper mapper ; public Producer( Storage s, UserMapper mapper) { this.s = s; this.mapper = mapper; } public void run() { try { while (true) { System.err.println("当前分页是:"+page+"****************************************"); List<User> list= mapper.findUserInfo(page); s.push(list); page++; } } catch (InterruptedException e1) { e1.printStackTrace(); } } }
    //数据仓库
    public class Storage { BlockingQueue<User> queues = new LinkedBlockingQueue<User>(200); /** * 生产 * * @param p * 产品 * @throws InterruptedException */ public void push(List<User> p) throws InterruptedException { for(User user:p){ queues.put(user); } } /** * 消费 * * @return 产品 * @throws InterruptedException */ public User pop() throws InterruptedException { return queues.take(); } } }
  • 相关阅读:
    26. 60s快速定位服务器性能问题
    27. 性能测试总体流程
    18. Jmeter-取样器二
    17. Jmeter-取样器一
    15. Jmeter-配置元件二
    14. Jmeter-配置元件一
    13. Jmeter-定时器
    git 常用命令
    数据库常用操作
    【CSS】文字超出显示省略号&连续字符换行
  • 原文地址:https://www.cnblogs.com/zgghb/p/6601869.html
Copyright © 2011-2022 走看看