zoukankan      html  css  js  c++  java
  • 应用策略设计模式

           设计模式:设计模式(Design pattern)是一套被重复使用、多数人知晓的、经过分类编目的、代码设计经验的总结。

    使用设计模式是为了可重用代码、让代码更easy被他人理解、保证代码可靠性。 毫无疑问,设计模式于己于他人于系统都是多赢的。设计模式使代码编制真正project化。设计模式是软件project的基石脉络。如同大厦的结构一样。

           在最開始学习设计模式时。看着书籍,总是认为设计模式非常easy。但是。每次在实战的时候。常常渴望可以用上设计模式,但总是无法第一时间应用上某种设计模式。

    明明自己已经知晓了多个设计模式。看书也认为通俗易懂,为啥呢?好吧。经过不断学习,总算找到了一点原因:写书使用的样例比較简单以及理想化,而且作者牛逼。好啦,废话不多说。看看自己应用策略设计模式的详细案例。

           在这里,使用的是策略设计模式。什么是策略设计模式呢?策略模式定义了一系列的算法,并将每个算法封装起来,并且使它们还能够相互替换。策略模式让算法独立于使用它的客户而独立变化。

           这里的代码主要是在分析hdfs中的nginxlog日志文件,将每一个日志文件的參数抽取出来。

    比方以下四个链接:

    <span style="font-size:18px;">/index.html?q=download&bid=0&app_id=857&version_id=9736&channel_id=1&down_time=1425889447&platform=1&ip=1921718005&down_from=1 HTTP/1.1
    
    index.html?

    q=action&time=REQUEST_TIME&device_id=1764126fa512b2dd&app_id=7759&action=showtoolbar&package=com.good.world2&platform=1&sdk_version=&addon_version=1.3.6.160 HTTP/1.1"; GET /index.html?aid=5905&android_id=AE66F243-9F6A-491A-AD16-8EC7652DBAEC&product_model=iPhone+6+Plus&system_version=8.1.3&is_supported=1&add_time=1425538167&channel_id=1&platform=2&sdk_version=&addon_version=&q=active HTTP/1.1" 200 612 "-" "-" "-" /index.html?

    q=upload&asset_id=24726878&title=%E6%88%91%E7%9A%84%EF%BB%BF%E6%80%92%E7%81%AB%E6%88%98%E7%A5%9E%E8%A7%86%E9%A2%91%EF%BC%8C%E4%B8%8D%E7%9C%8B%E5%88%AB%E5%90%8E%E6%82%94%E5%93%A6%EF%BC%81&bid=33756344&upload_net=wifi&product_model=GT-N7100&system_version=4.1.2&app_id=5460&save_time=1425890861&device_id=f8ee017774daaf98&channel_id=1&platform=1&sdk_version=&addon_version= HTTP/1.1</span>


           抽取出来的结果:

    <span style="font-size:18px;">{platform=1, down_time=1425889447, q=download, app_id=857, down_from=1, version_id=9736, bid=0, channel_id=1, ip=1921718005}
    
    {platform=1, time=request_time, q=action, package=com.good.world2, action=showtoolbar, sdk_version=, app_id=7759, device_id=1764126fa512b2dd, addon_version=1.3.6.160 }
    
    {platform=2, android_id=ae66f243-9f6a-491a-ad16-8ec7652dbaec, q=active, product_model=iphone+6+plus, is_supported=1, sdk_version=, add_time=1425538167, aid=5905, channel_id=1, addon_version=, system_version=8.1.3}
    
    {asset_id=24726878, platform=1, product_model=gt-n7100, sdk_version=, device_id=f8ee017774daaf98, channel_id=1, addon_version=, title=%e6%88%91%e7%9a%84%ef%bb%bf%e6%80%92%e7%81%ab%e6%88%98%e7%a5%9e%e8%a7%86%e9%a2%91%ef%bc%8c%e4%b8%8d%e7%9c%8b%e5%88%ab%e5%90%8e%e6%82%94%e5%93%a6%ef%bc%81, save_time=1425890861, q=upload, upload_net=wifi, app_id=5460, bid=33756344, system_version=4.1.2}</span>
    
           

           分析的原理非常easy,通过递归方式,读取全部的nginxlog文件。然后解析每一条url。取出參数。最后将数据进行归类。最后又一次写入到各自的文件里。在最開始写代码是,非常自然的,自己使用了多个if{}else if{}来解决这个问题。

           

    <span style="color:#333333;">package net.itaem.fs;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.io.OutputStream;
    import java.io.PrintStream;
    import java.net.URI;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    /**
     * 使用hdfs来etl出hive的数据格式
     * 这里面须要区分出多种接口的数据,眼下一共同拥有四个接口数据
     * 
     * 因为有可能会重新启动这个类,所以必须进行每一个文件仅仅被etl一次的保证
     * 问了达到该目的,这里使用日志的策略,也就是被etl过的文件名称。保存到一个日志文件里
     * 假设某个目录已经被etl完毕,那么就将整个文件记录到日志目录
     * 
     * 每次启动时,程序读取日志文件
     * 每次进行数据解析时。会首先推断一个目录是否被etl,假设没有,递归读取该目录以下的全部文件,而且推断该文件是否被etl过。假设没有,进行etl;
     * 假设已经被etl过。直接跳过
     * 
     * 
     * 所以这里使用两个日志文件,一个用于保存被etl过的目录,另外一个保存被etl过的文件名称
     * 
     * @author luohong
     * @author 2015-03-06
     * @author 846705189@qq.com
     * */
    public class ETLLog {
    
    	/**
    	 * hdfs配置
    	 * */
    	private Configuration conf = null;
    
    	private FileSystem fs = null;
    
    	/**
    	 * 日期格式化类。主要用来对数据进行依照日期分类
    	 * */
    	private SimpleDateFormat sdf = null;
    
    	/**
    	 * 被读取过的目录日志文件
    	 * */
    	private String readedDirectoryPath = "E:\readedDirectory.txt";
    
    	/**
    	 * 被读取过的文件日志文件
    	 * */
    	private String readedFilePath = "E:\readedFile.txt";
    
    	/**
    	 * 将日志存入到list中
    	 * */
    	private Set<String> readedDirectorySet;
    	private Set<String> readedFileSet;
    
    	/**
    	 * 输出日志
    	 * */
    	private LogWriter writer;
    
    	/**
    	 * 读取日志
    	 * */
    	private LogReader reader;
    
    
    	/**
    	 * hdfs根路径
    	 * */
    	private String basePath = "hdfs://192.168.1.111:9000/data/flume/flume";
    
    	/**
    	 * hdfs输出路径
    	 * 这里的路径是输出数据的根路径
    	 * 眼下一共同拥有四个接口数据,所以使用四个目录来存放数据
    	 * */
    	private String outputPath = "hdfs://192.168.1.111:9000/output/hive/aaa.txt";
    
    	/**
    	 * 接口目录名称
    	 * */
    	private String installation = "installation";
    	private String video = "video";
    	private String down = "down";
    	private String from_app = "from_app";
    
    	/**
    	 * 安装与活跃统计接口 installation 接口目录路径
    	 * */
    	private String installationOutputPath = outputPath + "/" + installation;
    
    
    	/**
    	 * 行为统计接口 video  接口目录路径
    	 * */
    	private String videoOutputPath = outputPath + "/" + video;
    
    	/**
    	 * 下载统计接口 down 接口目录路径
    	 * */
    	private String downOutputPath = outputPath + "/" + down;
    
    	/**
    	 * 上传视频接口 from_app 接口目录路径
    	 * */
    	private String from_appOutputPath = outputPath + "/" + from_app;
    
    
    	/**
    	 * 用来存放全部解析后的数据
    	 * */
    	private List<String> cache = null;
    
    	/**
    	 * 使用map来存放全部的数据。这里数据一共同拥有四种接口数据。所以key的值也共同拥有四种
    	 * 
    	 * */
    	private Map<String, String> dataMap = null;
    
    	/**
    	 * 使用模板方法来简化调用
    	 * @throws IOException 
    	 * */
    	private void process() throws IOException{
    		System.out.println("begin setup");
    		setUp();
    		System.out.println("finish setup");
    
    		System.out.println("begin run");
    		run();
    		System.out.println("finish run");
    
    		System.out.println("begin finish");
    		finish();
    		fs.close();
    		writer.close();  //这时候整个递归已经完毕,关闭输出流
    		System.out.println("finish finish");
    	}
    
    	/**
    	 * 初始化
    	 * @throws IOException 
    	 * */
    	private void setUp() throws IOException{
    
    		conf = new Configuration();
    		fs = FileSystem.get(URI.create(basePath), conf);
    
    		cache = new ArrayList<String>();
    		sdf = new SimpleDateFormat("yyyy-MM--dd");
    
    
    		//初始化reader
    		reader = new LogReader(readedDirectoryPath, readedFilePath);
    		System.out.println(reader);
    
    		//初始化writer
    		writer = new LogWriter(readedDirectoryPath, readedFilePath);
    		System.out.println(writer);
    		//初始化日志
    		readedFileSet = reader.readFile();
    		readedDirectorySet = reader.readDirectory();
    
    		System.out.println("========================readed directory==============================");
    		System.out.println("readed directory size:" + readedDirectorySet.size());
    		System.out.println("========================readed directory==============================");
    
    		System.out.println("===========================readed file================================");
    		System.out.println("readed file size:" + readedFileSet.size());
    		System.out.println("===========================readed file================================");
    	}
    
    	/**
    	 * 读取文件,而且解析出数据内容
    	 * */
    	private void run(){
    		try {
    			read(basePath);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 结束
    	 * 
    	 * 这里将解析到的全部数据放入到hdfs,然后再使用hive建立一个相应的表格就可以
    	 * @throws IOException 
    	 * 
    	 * */
    	private void finish() throws IOException{
    		fs = FileSystem.get(URI.create(basePath), conf);
    
    		OutputStream outStream = fs.create(new Path(outputPath));
    		PrintStream out = new PrintStream(outStream);
    
    		for(String line: cache){
    			out.println(line);
    		}
    
    		out.close();
    		outStream.close();
    		fs.close();
    	}
    
    	/**
    	 * 
    	 * 读取一个目录以下的全部数据
    	 * 这里会递归调用
    	 * 而且将数据进行etl,然后保存在cache集合中
    	 * 
    	 * @param path目录的路径
    	 * */
    	private void read(String path) throws IOException{
    		System.out.println(path);
    		if(path == null || "".equals(path)) return;
    
    		fs = FileSystem.get(URI.create(path), conf);
    
    		if(!fs.exists(new Path(path))) return;
    
    		FileStatus fileList[] = fs.listStatus(new Path(path));
    
    		if(fileList.length == 0) return;
    
    		int fileNum = fileList.length;
    		InputStream hdfsInStream = null;
    		BufferedReader reader = null;
    		String line = null;
    		StringBuilder sb = null;
    		Map<String, String> paramMap = null;
    
    		// 遍历一个目录以下的全部文件
    		for(int fileCount = 0; fileCount < fileNum; fileCount++){
    			String fileName = path + "/" + fileList[fileCount].getPath().getName();
    
    			//该文件是目录,递归调用
    			if(fs.isDirectory(new Path(fileName))) {
    				//推断该目录是否已经被解析过,假设是,那么跳过该目录
    //				if(readedDirectorySet.contains(fileName)){
    //					continue;
    //				}
    				read(fileName);
    				
    				//记录下一个目录被etl。可是不能记录下根目录
    //				if(!fileName.equals(basePath)){
    //					writer.pintlnDirectory(fileName);
    //				}
    			}else{
    				if(!fs.exists(new Path(fileName))) continue;
                    
    				
    				//假设文件被解析过了,那么跳过该文件
    				if(readedFileSet.contains(fileName)){
    					continue;
    				}
    				
    				//普通文件,读取文件内容
    				hdfsInStream = fs.open(new Path(fileName));
    				reader = new BufferedReader(new InputStreamReader(hdfsInStream));
                    
    				while((line = reader.readLine()) != null){
    					if(line == null || line.equals("") || line.isEmpty()){
    						continue;
    					}
                        
    					//取出一个请求的数据
    					paramMap = CRequest.URLRequest(line);
    					sb = new StringBuilder(); 
    					String type = "";
    					for(String param: paramMap.keySet()) {
    						if(param.equals("q") && paramMap.get(param).equals("down")){
    							type = "down";
    							continue;
    						}else if(param.equals("q") && paramMap.get(param).equals("from_app")){
    							type = "from_app";
    							continue;
    						}else if(param.equals("q") && paramMap.get(param).equals("action")){
    							type = "action";
    							continue;
    						}else if(param.equals("q") && paramMap.get(param).equals("active")){
    							type = "active";
    							continue;
    						}else{
    							System.out.println("other type");
    						}
    						
    						String value = paramMap.get(param);
    						sb.append(value + " ");
    					}
    					
    					
    					if(type.equals("down")){
    						//save the param value to down file
    					}else if(type.equals("from_app")){
    						//save the param value to from_app
    					}else if(type.equals("action")){
    						//save the param value to action
    					}else if(type.equals("active")){
    					    //save the param value to active
    					}else{
    						System.out.println("other type");
    					}
    					
    					//将数据存放入cache。下次一次性刷入一个hdfs文件里
    					cache.add(sb.toString());
    				}
    
    				//关闭stream
    				hdfsInStream.close();
    				//关闭reader
    				reader.close();
    				//记录下一个文件被etl
    				writer.pintlnFile(fileName);
    			}
    		}
    	}
    
    	/**
    	 * 启动定时器,定期运行数据的etl,这里设置在每天凌晨1点钟运行数据转换
    	 * */
    	public static void main(String[] args) throws IOException {
    		new ETLLog().process();
    	}
    }
    </span>
           从红色区域我们能够看到。代码中使用了多个if{}else if{}。假设url的数据一直都仅仅有四种类型,那么这还没有啥,可是假设url类型加多10个呢?那我们的代码修改将会许多。很不利于后期维护。

           于是乎。我就想起了设计模式。从代码中我们能够发现,非常适合使用策略设计模式。

    哈哈,为什么会非常适合策略设计模式呢?这个大家能够看看《代码重构》,里面教授了几种方式。让我们发现坏代码的味道。在这里,将所有url类别抽象为一个抽象Event,然后每个详细的url类别作为一个Event子类就可以。

    在代码中,不再使用if{}else if{}来推断,而是使用面向接口:Event event = new EventImple()的变成方式来表示每个Event,然后将event放入List<Event>就可以。

           

    <span style="font-size:18px;">package com.aipai.luohong.event;
    
    /**
     * 定义一个类,用来表示全部的接口数据。每一个接口都属于一个event
     * 每一个event通过type来区分
     * 因为每一个event的保存路径都是不一样的。所以这里使用抽象方法来定义一个统一接口
     * @author luohong
     * @author 2015-03-09
     * @author 846705189@qq.com
     * 
     * */
    public abstract class Event {
        
    	//默认输出路径
    	private String basePath = "hdfs://ip-192-168-1-203:8020";
    	
    	private String type;
    	
    	public Event() {
    	}
    	
    	public Event(String basePath){
    		this.basePath = basePath;
    	}
    	
    	public String getBasePath() {
    		return basePath;
    	}
    
    	public void setBasePath(String basePath) {
    		this.basePath = basePath;
    	}
        
    
    	public String getType(){
    		return type;
    	}
    
    	public void setType(String type) {
    		this.type = type;
    	}
    	
    	public abstract String getOutputPath();
    }</span>


           

    package com.aipai.luohong.event;
    /**
     * 下载统计接口 q=download
     * @author luohong
     * @author 2015-03-09
     * @author 846705189@qq.com
     * 
     * */
    public class DownloadEvent extends Event {
    	private String outputPath = "/download/" + "current.txt";
        
    	public DownloadEvent(){
    		
    	}
    	
    	public DownloadEvent(String outputPath){
    		this.outputPath = outputPath;
    	}
    	
    	@Override
    	public String getOutputPath() {
    		return super.getBasePath() + outputPath;
    	}
    }
    
          每个url类别,就建立一个Event子类就可以。

    可是经过这样子改动之后。我们能够写一个配置文件,然后将每个Event,以及event相应的參数内容等信息配置在配置文件里,为以后迁移代码提供更大的灵活性。properties文件例如以下:

           

    <span style="font-size:18px;">basePath=hdfs://192.168.1.111:9000
       flumeDataPath=hdfs://192.168.1.111:9000/usr
       interval=5
       separator=,
       fileSize=5
       events=DownloadEvent,InstallationEvent,UploadEvent,VideoEvent
       eventTypes=down,active,from_app,action
    </span>
          详细參数意义大概就是用于配置hdfs的url,数据源。间隔时间,分隔符,文件大小,events就是有多少种Event,eventTypes相应events。通过这样的方式。代码的灵活性将会高不少,以下是详细代码。

    有了这个配置文件,以及框架代码,我们如果新加入一个DeleteEvent。那我们不须要修改不论什么代码。仅仅须要加入一个DeleteEvent。然后修改配置文件为:

    <span style="font-size:18px;">basePath=hdfs://192.168.1.111:9000
       flumeDataPath=hdfs://192.168.1.111:9000/usr
       interval=5
       separator=,
       fileSize=5
       events=DownloadEvent,InstallationEvent,UploadEvent,VideoEvent,DeleteEvent
       eventTypes=down,active,from_app,action,delete</span>
           程序会自己主动检測到Event以及相应的參数类型,大大简便我们的后期开发和维护。

          

    <span style="font-size:18px;"><span style="color:#333333;">package com.aipai.luohong;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.io.OutputStream;
    import java.io.PrintStream;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.Set;
    import java.util.Timer;
    import java.util.TimerTask;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    import com.aipai.luohong.event.Event;
    
    /**
     * 使用hdfs来etl出hive的数据格式
     * 这里面须要区分出多种接口的数据,眼下一共同拥有四个接口数据
     * 
     * 因为有可能会重新启动这个类,所以必须进行每一个文件仅仅被etl一次的保证
     * 问了达到该目的,这里使用日志的策略,也就是被etl过的文件名称。保存到一个日志文件里
     * 假设某个目录已经被etl完毕,那么就将整个文件记录到日志目录
     * 
     * 每次启动时,程序读取日志文件
     * 每次进行数据解析时。会首先推断一个目录是否被etl,假设没有。递归读取该目录以下的全部文件,而且推断该文件是否被etl过,假设没有,进行etl。
     * 假设已经被etl过。直接跳过
     * 
     * 所以这里使用两个日志文件,一个用于保存被etl过的目录,另外一个保存被etl过的文件名称
     * 
     * 
     * 眼下使用每间隔一段时间便解析数据,所以眼下会遍历全部的目录以下的数据,推断文件是否被解析过,假设是,那么就跳过文件
     * 
     * 输出文件策略使用的是:每一个文件大小由參数配置。默觉得128M,达到阀值时,生成新文件
     * 
     * @author luohong
     * @author 2015-03-06
     * @author 846705189@qq.com
     * */
    public class ETLLog {
    
    	/**
    	 * hdfs配置
    	 * */
    	private Configuration conf = null;
    
    	/**
    	 * hdfs文件系统
    	 * */
    	private FileSystem fs = null;
    
    	/**
    	 * 被读取过的目录日志文件
    	 * */
    	private String readedDirectoryPath = "/var/log/etl_log/readedDirectory.txt";
    
    	/**
    	 * 被读取过的文件日志文件
    	 * */
    	private String readedFilePath = "/var/log/etl_log/readedFile.txt";
    
    	/**
    	 * 将日志存入到list中
    	 * */
    	private Set<String> readedDirectorySet;
    	private Set<String> readedFileSet;
    
    	/**
    	 * 输出日志
    	 * */
    	private LogWriter writer;
    
    	/**
    	 * 读取日志
    	 * */
    	private LogReader reader;
    
    	/**
    	 * 分隔符,默认使用‘,’
    	 * */
    	private String separator = ",";
    
    	/**
    	 * 输出文件大小
    	 * */
    	private int fileSize = 128;
    
    	/**
    	 * 间隔时间
    	 * */
    	private int interval = 5;
    
    	/**
    	 * hdfs根路径
    	 * */
    	private String basePath = "hdfs://ip-192-168-1-203:8020";
    
    	private String flumeDataPath = "hdfs://ip-192-168-1-203:8020/usr/flume";
    
    	private String currentFile = "current.txt";
    
    	/**
    	 * 事件接口列表
    	 * */
    	private List<Class<?

    extends Event>> eventList = null; /** * 事件类型 * */ private List<String> eventTypeList = null; /** * 使用map来存放全部的数据。这里数据一共同拥有四种接口数据,所以key的值也共同拥有四种 * * */ private Map<String, List<String>> dataMap = null; /** * 计算即将要加入数据的字节长度 * */ private Map<String, Long> dataLengthMap = null; //提供getXxx接口,用于訪问配置信息 public int getInterval() { return interval; } public String getBasePath() { return basePath; } public String getSeparator(){ return separator; } public int getFileSize(){ return fileSize; } public String getFlumeDataPath() { return flumeDataPath; } /** * 使用模板方法来简化调用 * @throws IOException * @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException * */ private void process() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException{ System.out.println("begin setup"); setUp(); System.out.println("finish setup"); System.out.println("begin run"); run(); System.out.println("finish run"); System.out.println("begin finish"); finish(); fs.close(); writer.close(); //这时候整个递归已经完毕。关闭输出流 System.out.println("finish finish"); } /** * 初始化 * @throws IOException * @throws ClassNotFoundException * */ @SuppressWarnings("unchecked") private void setUp() throws IOException, ClassNotFoundException{ dataMap = new HashMap<String, List<String>>(); dataLengthMap = new HashMap<String, Long>(); </span><span style="color:#ff0000;">eventList = new ArrayList<Class<?

    extends Event>>(); eventTypeList = new ArrayList<String>(); //载入配置文件 InputStream inStream = new FileInputStream("/var/log/etl_log/properties"); Properties properties = new Properties(); properties.load(inStream); String basePath = properties.getProperty("basePath"); String flumeDataPath = properties.getProperty("flumeDataPath"); String interval = properties.getProperty("interval"); String separator = properties.getProperty("separator"); String fileSize = properties.getProperty("fileSize"); String events = properties.getProperty("events"); String eventTypes = properties.getProperty("eventTypes");</span><span style="color:#333333;"> if(basePath != null && !"".equals(basePath)){ this.basePath = basePath; } if(interval != null && !"".equals(interval)){ this.interval = Integer.valueOf(interval); } if(separator != null && !"".equals(separator)){ this.separator = separator; } if(fileSize != null && !"".equals(fileSize)){ this.fileSize = Integer.valueOf(fileSize); } if(flumeDataPath != null && !"".equals(flumeDataPath)){ this.flumeDataPath = flumeDataPath; } if(events != null && !"".equals(events)){ for(String eventStr: events.split(",")){ String eventClassName = "com.aipai.luohong.event." + eventStr; Class<? extends Event> event = (Class<? extends Event>) Class.forName(eventClassName); eventList.add(event); List<String> cache = new ArrayList<String>(); //每种event都会使用一种缓存和一个计数器 dataMap.put(eventClassName, cache); dataLengthMap.put(eventClassName, 0l); } System.out.println(eventList); } if(eventTypes != null && !"".equals(eventTypes)){ for(String eventType: eventTypes.split(",")){ eventTypeList.add(eventType); } System.out.println(eventTypeList); } conf = new Configuration(); fs = FileSystem.get(URI.create(basePath), conf); //初始化reader reader = new LogReader(readedDirectoryPath, readedFilePath); System.out.println(reader); //初始化writer writer = new LogWriter(readedDirectoryPath, readedFilePath); System.out.println(writer); //初始化日志 readedFileSet = reader.readFile(); readedDirectorySet = reader.readDirectory(); System.out.println("========================readed directory=============================="); System.out.println(readedDirectorySet.size()); System.out.println("========================readed directory=============================="); System.out.println("===========================readed file================================"); System.out.println(readedFileSet.size()); System.out.println("===========================readed file================================"); } /** * 读取文件,而且解析出数据内容 * */ private void run(){ try { read(flumeDataPath); } catch (IOException e) { e.printStackTrace(); } } /** * 将缓存数据写入到hdfs * * @throws IOException * @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException * * */ @SuppressWarnings("deprecation") private void finish() throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException{ fs = FileSystem.get(URI.create(basePath), conf); OutputStream outStream = null; PrintStream out = null; /** * * * */ for(String key: dataMap.keySet()){ List<String> dataList = dataMap.get(key); Long dataSize = dataLengthMap.get(key); System.out.println(key + " data size is " + dataList.size()); Class<?> eventClass = Class.forName(key); Event event = (Event) eventClass.newInstance(); event.setBasePath(basePath + "/output/hive"); if(!fs.exists(new Path(event.getOutputPath()))){ //假设数据不存在,创建新文件 outStream = fs.create(new Path(event.getOutputPath())); }else{ //追加数据 long length = fs.getLength(new Path(event.getOutputPath())); if(length + dataSize <= fileSize * 1000 * 1000){ //假设数据没有超过阀值,那么加入到一个文件里,假设超过了,新建一个新文件 outStream = fs.append(new Path(event.getOutputPath())); }else{ //数据超过阀值,那么新建一个文件,将原来current.txt文件内容复制过去,清空current.txt String str = event.getOutputPath().replaceAll(currentFile, "") + new Date().getTime() + ".txt"; //使用时间戳来生成文件 fs.rename(new Path(event.getOutputPath()), new Path(str)); //拷贝文件 //将current.txt清空,数据又一次写入到该文件 fs.delete(new Path(event.getOutputPath())); outStream = fs.create(new Path(event.getOutputPath())); } } out = new PrintStream(outStream); for(String data: dataList){ out.println(data); } } out.close(); outStream.close(); fs.close(); } /** * * 读取一个目录以下的全部数据 * 这里会递归调用 * 而且将数据进行etl。然后保存在cache集合中 * * @param path目录的路径 * */ private void read(String path) throws IOException{ if(path == null || "".equals(path)) return; fs = FileSystem.get(URI.create(path), conf); if(!fs.exists(new Path(path))) return; FileStatus fileList[] = fs.listStatus(new Path(path)); if(fileList.length == 0) return; int fileNum = fileList.length; InputStream hdfsInStream = null; BufferedReader reader = null; String line = null; StringBuilder sb = null; Map<String, String> paramMap = null; // 遍历一个目录以下的全部文件 for(int fileCount = 0; fileCount < fileNum; fileCount++){ String fileName = path + "/" + fileList[fileCount].getPath().getName(); //该文件是目录,递归调用 if(fs.isDirectory(new Path(fileName))) { // if(readedDirectorySet.contains(fileName)){ // continue; // } read(fileName); //记录下一个目录被etl //writer.pintlnDirectory(fileName); }else{ if(!fs.exists(new Path(fileName))) continue; //假设文件被解析过了。那么跳过该文件 if(readedFileSet.contains(fileName)){ continue; } //普通文件,读取文件内容 hdfsInStream = fs.open(new Path(fileName)); reader = new BufferedReader(new InputStreamReader(hdfsInStream)); while((line = reader.readLine()) != null){ if(line == null || line.equals("") || line.isEmpty()){ continue; } paramMap = CRequest.URLRequest(line); sb = new StringBuilder(); int index = 0; for(String param: paramMap.keySet()) { if(param.equals("q")){ //用于划分接口參数 index = eventTypeList.indexOf(paramMap.get(param)); if(index == -1) System.out.println(paramMap.get(param)); }else{ //其它參数 String value = paramMap.get(param); sb.append(value + separator); } } String str = ""; if(sb.length() > 1){ str = sb.substring(0, sb.length() - 1); } if(index == -1){ continue; } Class<? extends Event> event = eventList.get(index); dataMap.get(event.getCanonicalName()).add(str); dataLengthMap.put(event.getCanonicalName(), dataLengthMap.get(event.getCanonicalName()) + str.getBytes().length); } //关闭stream hdfsInStream.close(); //关闭reader reader.close(); //记录下一个文件被etl writer.pintlnFile(fileName); } } } public static void main(String[] args) throws IOException { final ETLLog etlLog = new ETLLog(); //启动定时器 new Timer().schedule(new TimerTask(){ @Override public void run() { try { etlLog.process(); } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { e.printStackTrace(); } } }, 1000, etlLog.getInterval() * 60 * 1000); } } </span></span>


           总结:在开发过程中。设计模式并非那么清晰明显的。须要我们不断的去重构代码。

    好吧,重构花了我大概一个下午的时间。可是最后看到代码变得好起来之后,还是值得的。好吧,继续努力,继续逗比去。


    版权声明:本文博客原创文章,博客,未经同意,不得转载。

  • 相关阅读:
    npm version patch
    nginx 操作
    基于 Vue CLI 组件库封装,按需加载实现
    nginx 配置文件路径获取
    Laravel 生产资源路由并指定模型
    base.js,通用js方法,Js方法封装
    jquery.params.js,Jquery获取页面参数,js获取页面参数
    layui使用,LayUI select不显示,LayUI文件上传,Layui自定义校验规则
    Layer弹窗消息封装,Layer消息提示封装,Layer使用
    Html跨域js封装,前端页面跨域js,postMessage实现跨域
  • 原文地址:https://www.cnblogs.com/lcchuguo/p/4735059.html
Copyright © 2011-2022 走看看