zoukankan      html  css  js  c++  java
  • 多线程实现数据库的并发操作

      在Java中,程序需要操作数据库,操作数据首要事就是要获得数据库的Connection对象,利用多线程对数据导入数据库中将会加快操作进度,但是多个线程共享Connection对象,是不安全的,因为可以利用Java中的ThreadLocal为每个线程保存一个Connection对象,代码如下:

    package com.quar.innovation.db;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    
    public class ConnnectionManager {
    
    	private static final ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>();
    	
    	private static final String BETADBURL = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&user=root&password=root";
    
    	
    	public static Connection getConnectionFromThreadLocal() {
    		Connection conn = connectionHolder.get();
    		try {
    			if (conn == null || conn.isClosed()) {
    				Connection con = ConnnectionManager.getConnection();
    				connectionHolder.set(con);
    				System.out.println("[Thread]" + Thread.currentThread().getName());
    				return con;
    			}
    			return conn;
    		} catch (Exception e) {
    			System.out.println("[ThreadLocal Get Connection Error]" + e.getMessage());
    		}
    		return null;
    		
    		
    	}
    	
    	public static Connection getConnection() {
    		Connection conn = null;
    		try {
    			Class.forName("com.mysql.jdbc.Driver");
    			conn = (Connection) DriverManager.getConnection(BETADBURL);
    		} catch (Exception e) {
    			System.out.println("[Get Connection Error]" + e.getMessage());
    		}
    		return conn;
    	}
    }
    

      通过ThreadLocal就可以为每个线程保留一份Connection对象,利用Java的ThreadPoolExecutor启动线程池,完成数据库操作,完整代码如下:

    public class QunarThreadPoolExecutor extends ThreadPoolExecutor {
    
        // 记录每个线程执行任务开始时间
        private ThreadLocal<Long> start = new ThreadLocal<Long>();
        
        // 记录所有任务完成使用的时间
        private AtomicLong totals = new AtomicLong();
        
        // 记录线程池完成的任务数
        private AtomicInteger tasks = new AtomicInteger();
    	
    	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    	}
    
    	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    	}
    
    	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    	}
    
    	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue) {
    		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    	}
    	
    	 /**
         * 每个线程在调用run方法之前调用该方法
         * */ 
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            start.set(System.currentTimeMillis());
        }
    
        /**
         * 每个线程在执行完run方法后调用该方法
         * */
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            tasks.incrementAndGet();
            totals.addAndGet(System.currentTimeMillis() - start.get());
        }
    
        @Override
        protected void terminated() {
            super.terminated();
            System.out.println("完成"+ tasks.get() +"个任务,平均耗时: [" + totals.get() / tasks.get() + "] ms");
        }
    
    
    public class DataUpdater implements Runnable {
    
    	private PreparedStatement pst;
    	
    	private List<UserProfileItem> userProfiles;
    	
    	private final String SQL = "insert into userprofile (`uid` ,`profile` , `logday`) VALUES (?, ? ,?) ON DUPLICATE KEY UPDATE `profile`= ? ";
    	
    	public DataUpdater(List<UserProfileItem> userProfiles) {
    		this.userProfiles = userProfiles;
    	}
    	
    	public void run() {
    		try {
    			pst = ConnnectionManager.getConnectionFromThreadLocal().prepareStatement(SQL);
    			for (UserProfileItem userProfile : userProfiles) {
    				if(userProfile.getUid() != null && !userProfile.getUid().isEmpty() && 
    						userProfile.getProfile() != null && !userProfile.getProfile().isEmpty()) {
    					pst.setString(1, userProfile.getUid());
    					pst.setString(2, userProfile.getProfile());
    					pst.setInt(3, userProfile.getLogday());
    					pst.setString(4, userProfile.getProfile());
    					pst.addBatch();
    				}
    			}
    			pst.executeBatch();
    		} catch (Exception e) {
    			System.err.println("[SQL ERROR MESSAGE]" + e.getMessage());
    		} finally {
    			 close(pst);
    		}
    		
    	}
    
    	public void close(PreparedStatement pst) {
    		if (pst != null) {
    			try {
    				pst.close();
    			} catch (SQLException e) {
    				System.err.println("[Close Statement Error]" + e.getMessage());
    			}
    		}
    	}
    }
    
    
    public class UserProfileItem {
    
    	private String uid;
    	
    	private String profile;
    	
    	private int logday;
    	
    	public UserProfileItem(String uid, String profile , int logday) {
    		this.logday = logday;
    		this.profile = profile;
    		this.uid = uid;
    	}
    
    	public String getUid() {
    		return uid;
    	}
    
    	public String getProfile() {
    		return profile;
    	}
    
    	public int getLogday() {
    		return logday;
    	}
    	
    }
    
    public class DataUpdaterMain {
    	
    	private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    	
    	private QunarThreadPoolExecutor qunarThreadPoolExecutor = new QunarThreadPoolExecutor(5, 8, 5, TimeUnit.MINUTES, queue);
    	
    	
    	public void shutThreadPool(ThreadPoolExecutor executor) {
    		if (executor != null) {
    			executor.shutdown();
    			try {
    				if (!executor.awaitTermination(20 , TimeUnit.MINUTES)) {
    					executor.shutdownNow();
    				} 
    			} catch (Exception e) {
    				System.err.println("[ThreadPool Close Error]" + e.getMessage());
    			}
    			
    		}
    	}
    	
    	public void close(Reader reader) {
    		if (reader != null) {
    			try {
    				reader.close();
    			} catch (IOException e) {
    				System.err.println("[Close Io Error]" + e.getMessage());
    			}
    		}
    	}
    	
    	public void closeConnection(Connection conn , Statement st) {
    		try {
    			if (conn != null) {
    				conn.close();
    			}
    			if (st != null) {
    				conn.close();
    			}
    		} catch (Exception e) {
    			System.err.println("[Close MySQL Error]" + e.getMessage());
    		}
    	}
    	
    	
    	public boolean update(String file ,int logday) {
    		long start = System.currentTimeMillis();
    		BufferedReader br = null;
    		int num = 0;
    		try {
    			br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
    			String line = null;
    			List<UserProfileItem> userProfiles = new LinkedList<UserProfileItem>();
    			while ((line = br.readLine()) != null) {
    				++num;
    				String []items = line.split("	");
    				if (items.length == 2) {
    					String uid = items[0];
    					String profile = items[1];
    					userProfiles.add(new UserProfileItem(uid, profile, logday));
    					if (userProfiles.size() >= 100) {
    						qunarThreadPoolExecutor.execute(new DataUpdater(userProfiles));
    						userProfiles = new LinkedList<UserProfileItem>();
    					}
    				} else {
    					System.err.println("[Data Error]" + line);
    				}
    			}
    			qunarThreadPoolExecutor.execute(new DataUpdater(userProfiles));;
    		} catch (Exception e) {
    			e.printStackTrace();
    			System.err.println("[Read File Error]" + e.getMessage());
    			return false;
    		}  finally {
    			System.err.println("[Update] take time " + (System.currentTimeMillis() - start) + ".ms");
    			System.err.println("[Update] update item " + num);
    			shutThreadPool(qunarThreadPoolExecutor);;
    			close(br);
    		}
    		return true;
    	}
    	
    	public static void main(String []args) {
    		String file = "D:\workspaces\promotionwordData.log";
    		int logday = Integer.parseInt("20150606");
    		DataUpdaterMain dataUpdaterMain = new DataUpdaterMain();
    		dataUpdaterMain.update(file, logday);
    	}
    }
    

      

  • 相关阅读:
    HTML 拖放 和 地理定位
    HTML web存储
    HTML 语义元素 和 MathML元素
    Docker Swarm
    Docker Machine
    Docker Compose
    Docker 的网络模式
    数据共享与持久化
    镜像和容器的基本操作
    Docker 简介
  • 原文地址:https://www.cnblogs.com/hanfight/p/4701763.html
Copyright © 2011-2022 走看看