zoukankan      html  css  js  c++  java
  • 基于文件系统(及MySQL)使用Java实现MapReduce

    实现这个代码的原因是:

    • 我会MapReduce,但是之前都是在AWS EMR上,自己搭过伪分布式的,但是感觉运维起来比较困难;
    • 我就MySQL会一点(本来想用mongoDB的但是不太会啊)
    • 数据量不是很大,至少对我来说。
    • 希望不要出很么问题,这方面文件系统还是可以信任的。

    设计思路如下:

    • init阶段:将所需的文件添加到一个列表文件input_file_list.txt中。
    • Map阶段:读取input_file_list.txt中的每一个文件的每一行,并将其映射成一个key-value对。
      考虑到key可能包含特殊字符,所以这里使用MySQL存储一个id到key的对应关系的数据。
    • Reduce阶段:针对每一个key,读取对应的文件,最终生成一个name-value列表,该name-value列表对应一个json对象,如:{ "name": "zifeiy", "age": 88 },将所有的json对象存储到一个结果文件reduceResult.txt中。
    • 处理结果阶段,将reduceResult.txt文件进行解析,最终生成结果的CSV文件或者Excel文件。

    主要代码:

    package com.zifeiy.snowflake.tools.mapreduce.v1;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.OutputStreamWriter;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.Statement;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import com.google.gson.Gson;
    import com.zifeiy.snowflake.assist.CsvOneLineParser;
    import com.zifeiy.snowflake.assist.FileHelper;
    
    import jxl.Workbook;
    import jxl.write.Label;
    import jxl.write.WritableSheet;
    import jxl.write.WritableWorkbook;
    
    
    public abstract class MapReduceBaseVersion1 {
    	
    	private static final String APPENDED_DB_INFO = "?useUnicode=true&characterEncoding=UTF8" 
    										            + "&rewriteBatchedStatements=true" 
    										            + "&useLegacyDatetimeCode=false" 
    										            + "&serverTimezone=Asia/Shanghai"
    										            + "&useSSL=false";
    	private static final String classname		= "com.mysql.cj.jdbc.Driver";
    	private static final String url				= "jdbc:mysql://localhost:3306/snowflake" + APPENDED_DB_INFO;
    	private static final String username			= "root";
    	private static final String password    = "password";
    	
    	public static final String taskRootPath = "D:\snowflake\task";
    	
    	private Connection connection = null;
    	private File inputListFile = null;
    	private File reduceResultFile = null;
    	private File resultFile = null;
    	private int taskId;
    	
    	public void addInputPath(File file) throws IOException {
    		FileHelper.appendFile(inputListFile, file.getAbsolutePath() + "
    ");
    	}
    	
    	public void setKeyValuePair(String key, String value) throws Exception {
    		int id = -1;
    		Statement statement = connection.createStatement();
    		ResultSet resultSet = statement.executeQuery(String.format("select id from tmp" + taskId + " where kname='%s'", key.replaceAll("'", "''")));
    		if (resultSet.next()) {
    			id = resultSet.getInt(1);
    		}
    		else {
    			statement.execute(String.format("insert into tmp" + taskId + " (kname) values ('%s')", key.replaceAll("'", key.replaceAll("'", "''"))));
    			resultSet = statement.executeQuery(String.format("select id from tmp" + taskId + " where kname='%s'", key.replaceAll("'", "''")));
    			if (resultSet.next()) {
    				id = resultSet.getInt(1);
    			}
    		}
            if (id == -1) throw new Exception("set key value pair failed: key = " + key + ", value = " + value);
            File tmpFile = new File(taskRootPath + File.separator + taskId + File.separator + "tmp" + File.separator + id + ".txt");
            if (tmpFile.exists() == false) {
            	tmpFile.createNewFile();
            }
            FileHelper.appendFile(tmpFile, value + "
    ");
    	}
    	
    	public void addParamList(List<Map<String, String>> paramList) throws Exception {
    		String content = "";
    		Gson gson = new Gson();
    		for (Map<String, String> params : paramList) {
    			String jsonString = gson.toJson(params);
    			content += jsonString + "
    ";
    		}
    		FileHelper.appendFile(reduceResultFile, content);
    	}
    	
    	public void generateFile(String[] columns, String[] nameColumns) throws Exception {
    		if (reduceResultFile == null || reduceResultFile.exists() == false) {
    			throw new Exception("[mapreduce.v1] in generateFile function: reduceResultFile do not exist!");
    		}
    //		if (false) {	// test
    		if (reduceResultFile.length() > 1 * 1024 * 1024) {	// 如果文件大小超过1MB,导出成csv
    			resultFile = new File(taskRootPath + File.separator + taskId + File.separator + "result.csv");
    			
    			Gson gson = new Gson();
    			
    			BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(reduceResultFile), "UTF-8"));
    			FileOutputStream fos = new FileOutputStream(resultFile);
    	        OutputStreamWriter osw = new OutputStreamWriter(fos, "UTF-8");
    	        
    	        String content = "";
    	        for (int i = 0; i < nameColumns.length; i ++) {
    	        	if (i > 0)
    	        		content += ",";
    	        	content += '"' + nameColumns[i] + '"';
    	        }
    	        osw.write(content + "
    ");
    	        
    	        String line = null;
    	        while ((line = br.readLine()) != null) {
    	        	content = "";
    	        	Map<String, String> map = gson.fromJson(line, Map.class);
    	        	if (map == null) { throw new Exception("map is null by parsing line: " + line); }
    	        	for (int i = 0; i < columns.length; i ++) {
    	        		if (i > 0) content += ",";
    	        		String c = columns[i];
    	        		String v = map.get(c);
    	        		if (v != null) {
    	        			content += '"' + v + '"';
    	        		}
    	        	}
    	        	osw.write(content + "
    ");
    	        }
    	        br.close();
    	        osw.write(content);
    	        osw.flush();
    	        osw.close();
    		} else {	// 如果文件大小小于1MB,导出成Excel文件
    			resultFile = new File(taskRootPath + File.separator + taskId + File.separator + "result.xls");
    			
    			WritableWorkbook workbook = Workbook.createWorkbook(resultFile);
    			WritableSheet sheet = workbook.createSheet("sheet1", 0);
    			
    			BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(reduceResultFile), "UTF-8"));
    	        String line = null;
    	        
    	        for (int i = 0; i < nameColumns.length; i ++) {
    	        	sheet.addCell(new Label(i, 0, nameColumns[i]));
    	        }
    	        
    	        int rowId = 1;
    	        while ((line = br.readLine()) != null) {
    	        	Gson gson = new Gson();
    	            List<String> rowList = new ArrayList<String>();
    	            
    	            Map<String, String> map = gson.fromJson(line, Map.class);
    	        	if (map == null) { throw new Exception("map is null by parsing line: " + line); }
    	        	for (int i = 0; i < columns.length; i ++) {
    	        		String c = columns[i];
    	        		String v = map.get(c);
    	        		String innerContent = "";
    	        		if (v != null) {
    	        			innerContent = v;
    	        		}
    	        		sheet.addCell(new Label(i, rowId, innerContent));
    	        	}
                	rowId ++;
    	            
    	        }
    	        br.close();
    	        
    	        workbook.write();
    	        workbook.close();
    			
    		}
    	}
    	
    	public abstract void init() throws Exception;
    	
    	public abstract void map(String line) throws Exception;
    	
    	public abstract void reduce(String key, ReduceReader reduceReader) throws Exception;
    	
    	public abstract void generate() throws Exception;
    	
    	public String mapreduce() {
    		try {
    			Class.forName(classname);
    			connection = DriverManager.getConnection(url, username, password);
    			
    			// generate taskId
    			PreparedStatement preparedStatement = connection.prepareStatement("insert into task () values ()");
    			preparedStatement.execute("insert into task () values ()", PreparedStatement.RETURN_GENERATED_KEYS);
                ResultSet resultSet = preparedStatement.getGeneratedKeys();
                if (resultSet.next()) {
                	taskId = resultSet.getInt(1);
                }
                else {
                	throw new Exception("[mapreduce.v1] Exception: can not generate taskId");
                }
                // generated task file path
                String taskPath = taskRootPath + File.separator + taskId;
                File taskPathDir = new File(taskPath);
                if (taskPathDir.exists() == true) {
                	throw new Exception("[mapreduce.v1] Exception: task directory already exists");
                }
                taskPathDir.mkdirs();
                String tmpDirPath = taskPath + File.separator + "tmp";
                File tmpDir = new File(tmpDirPath);
                tmpDir.mkdirs();
                this.inputListFile = new File(taskPath + File.separator + "input_file_list.txt");
                inputListFile.createNewFile();
                // period. 1: init()
                // during init period, we will use addInputPath function to add all the input files we need
                init();
                
                // begin to read each line of each file
                // peroid. 2: map(line)
                
                // db prepare
                Statement statement = connection.createStatement();
                statement.execute("create temporary table tmp" + taskId + " ( id int not null auto_increment primary key, kname varchar(200) )");
                
                // file content prepare
                
                BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(inputListFile), "UTF-8"));
                String inputFilename = null;   
                while ((inputFilename = br.readLine()) != null) {
                    File inputFile = new File(inputFilename);
                    if (inputFile.exists() == false) {
                    	throw new Exception("[mapreduce.v1] Exception: input file " + inputFilename + " do not exists!");
                    }
                    BufferedReader br2 = new BufferedReader(new InputStreamReader(new FileInputStream(inputFile), "GBK"));
                    String line = null;
                    while ((line = br2.readLine()) != null) {
                    	map(line);
                    }
                }
                br.close();
                
                // period. 3: reduce(key, valueList)
                reduceResultFile = new File(taskPath + File.separator + "reduce.txt");
                if (reduceResultFile.exists() == true) {
                	throw new Exception("[mapreduce.v1] reduce file already exists!");
                }
                reduceResultFile.createNewFile();
                
    	        resultSet = statement.executeQuery("select * from tmp" + taskId);
    	        while (resultSet.next()) {
    	        	int id = resultSet.getInt(1);
    	        	String key = resultSet.getString(2);
    	        	File reduceFile = new File(tmpDirPath + File.separator + id + ".txt");
    	        	if (reduceFile.exists() == false) {
    	        		throw new Exception("[mapreduce.v1] Exception: reduce file " + reduceFile.getName() + " not exists!");
    	        	}
    	        	ReduceReader reduceReader = new ReduceReader(reduceFile);
    	        	reduce(key, reduceReader);
    	        }
    	        
    	        // period. 4: generate
    	        // generate the result file
    	        generate();
                
    	        connection.close();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		
    		if (resultFile == null) return null;
    		else return resultFile.getAbsolutePath();
    		
    	}
    	
    	
    	// main for test
    	public static void main(String[] args) {
    		MapReduceBaseVersion1 mapReduceBaseVersion1 = new MapReduceBaseVersion1() {
    			
    			@Override
    			public void reduce(String key, ReduceReader reduceReader) throws Exception {
    				// TODO Auto-generated method stub
    				List<Map<String, String>> paramList = new ArrayList<Map<String,String>>();
    				
    				String line;
    				while ( (line = reduceReader.next()) != null ) {
    					List<String> rowList = CsvOneLineParser.parseLine(line);
    					Map<String, String> tmpMap = new HashMap<String, String>();
    					int idx = 0;
    					for (String s : rowList) {
    						idx ++;
    						tmpMap.put("" + idx, s);
    					}
    					paramList.add(tmpMap);
    				}
    				addParamList(paramList);
    			}
    			
    			@Override
    			public void map(String line) throws Exception {
    				// TODO Auto-generated method stub
    				setKeyValuePair(line.substring(1, 3), line);
    			}
    			
    			@Override
    			public void init() throws Exception {
    				// TODO Auto-generated method stub
    				addInputPath(new File("D:\test\test.del"));
    			}
    			
    			@Override
    			public void generate() throws Exception {
    				// TODO Auto-generated method stub
    				generateFile(new String[] { "1", "2", "3", "4", "5", "6" }, new String[] { "一", "二", "三", "四", "五", "六" });
    			}
    		};
    		System.out.println(mapReduceBaseVersion1.mapreduce());
    	}
    }
    
    
  • 相关阅读:
    垂直同步
    C++ RAII
    C++ RAII
    LCD刷新率和垂直同步的设置
    ping结果中TTL是什么意思
    垂直同步
    stage.frameRate改变帧频
    ping结果中TTL是什么意思
    stage.frameRate改变帧频
    ping 命令的原理,揭开单向“Ping”通的奥秘
  • 原文地址:https://www.cnblogs.com/zifeiy/p/9451515.html
Copyright © 2011-2022 走看看