zoukankan      html  css  js  c++  java
  • 数据清洗

    1、 数据清洗:按照进行数据清洗,并将清洗后的数据导入hive数据库中

    两阶段数据清洗:

    (1)第一阶段:把需要的信息从原始日志中提取出来

    ip:    199.30.25.88

    time:  10/Nov/2016:00:01:03 +0800

    traffic:  62

    文章: article/11325

    视频: video/3235

    (2)第二阶段:根据提取出来的信息做精细化操作

    ip--->城市 cityIP

    date--> time:2016-11-10 00:01:03

    day: 10

    traffic:62

    type:article/video

    id:11325

    (3)hive数据库表结构:

    create table data(  ip string,  time string , day string, traffic bigint,

    type string, id   string )

    通过mapreduce中的map程序对源数据进行去除逗号和转化日期格式,采用跟mysql相同的解决方式对hive数据库进行增加数据。

    源代码:

    mapreduce类







    package MapReduceMethod; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import Entity.datainfo; import Service.Service; public class QingxiRuku { static Service service=new Service(); public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); //原时间格式 public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//现时间格式 private static Date parseDateFormat(String string) { //转换时间格式 Date parse = null; try { parse = FORMAT.parse(string); } catch (Exception e) { e.printStackTrace(); } return parse; } public static class Map extends Mapper<LongWritable , Text , Text , Text>{ public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); System.out.println(line); String arr[]=line.split(","); Date date = parseDateFormat(arr[1]); context.write(new Text(arr[0]), new Text(dateformat1.format(date)+","+arr[2]+","+arr[3]+","+arr[4]+","+arr[5])); } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ static datainfo info=new datainfo(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ for (Text value : values) { String[] strNlist = value.toString().split(","); info.setTime(strNlist[0]); info.setDay(strNlist[1]); info.setTraffic(strNlist[2]); info.setType(strNlist[3]); info.setId(strNlist[4]); service.add("data", info); context.write(new Text(key), new Text(value)); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); System.out.println("start"); Job job =new Job(conf,"QingxiRuku"); job.setJarByClass(QingxiRuku.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //设置map输出的key类型 job.setMapOutputKeyClass(Text.class); //设置map输出的value类型 job.setMapOutputValueClass(Text.class); //设置输出的key类型 job.setOutputKeyClass(Text.class); //设置输出的value类型 job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://localhost:9000/mymapreduce1/in/result"); Path out=new Path("hdfs://localhost:9000/mymapreduce1/test"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

      DBUtil类 链接数据库:

    package DBUtil;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    import Entity.EntityToString;
    import Service.Service;
    
    /**
     * 数据库连接工具
     * @author Hu
     *
     */
    public class DBUtil {
    	
    	public static String db_url = "jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true";
    	public static String db_user = "hive";
    	public static String db_pass = "hive";
    	
    	public static Connection getConn () {
    		Connection conn = null;
    		
    		try {
    			Class.forName("org.apache.hive.jdbc.HiveDriver");//加载驱动
    			
    			conn = DriverManager.getConnection(db_url, db_user, db_pass);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		
    		return conn;
    	}
    	
    	/**
    	 * 关闭连接
    	 * @param state
    	 * @param conn
    	 */
    	public static void close (Statement state, Connection conn) {
    		if (state != null) {
    			try {
    				state.close();
    			} catch (SQLException e) {
    				e.printStackTrace();
    			}
    		}
    		
    		if (conn != null) {
    			try {
    				conn.close();
    			} catch (SQLException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    	
    	public static void close (ResultSet rs, Statement state, Connection conn) {
    		if (rs != null) {
    			try {
    				rs.close();
    			} catch (SQLException e) {
    				e.printStackTrace();
    			}
    		}
    		
    		if (state != null) {
    			try {
    				state.close();
    			} catch (SQLException e) {
    				e.printStackTrace();
    			}
    		}
    		
    		if (conn != null) {
    			try {
    				conn.close();
    			} catch (SQLException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public static void main(String[] args) throws SQLException {
    		Service cs=new Service();
    		EntityToString ets=new EntityToString();
    		/*System.out.println(ets.getStringList(cs.list("data1", InfoNo2.class)));
    		System.out.println(ets.getStringList(cs.list("data4", InfoNo3.class)));
    		System.out.println(ets.getStringList(cs.list("data5", InfoNo4.class)));*/
    	}
    }
    

      Dao层  操作数据库类

    package Dao;
    
    import java.lang.reflect.Field;
    import java.lang.reflect.Modifier;
    import java.sql.Connection;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import DBUtil.DBUtil;
    import Entity.EntityToString;
    
    
    /**
     * 通用类Dao
     * Dao层操作数据
     * @author HDQ
     *
     */
    public class Dao {
    
    	/**
    	 * 添加
    	 * @return
    	 */
    	
    	
    	/**
    	 * 添加T
    	 * @return
    	 */
    	public <T> boolean add(String table,T obj) {
    		StringHandle sh=new StringHandle();
    		EntityToString ets=new EntityToString();
    		String []strList=sh.StringListToStringNlist(ets.getNameList(obj.getClass()));
    		String []strList1=sh.StringListToStringNlist(ets.getStringListSingle(obj));
    		
    		if(strList.length==0)
    			return false;
    		String sql = "insert into "+table+"(";
    		for(int i=0;i<strList.length;i++)
    		{
    			if(i!=strList.length-1)
    				sql+=strList[i]+",";
    			else sql+=strList[i]+")";
    		}
    		sql+=" values('";
    		for(int i=0;i<strList1.length;i++)
    		{
    			if(i!=strList1.length-1)
    				sql+=strList1[i]+"','";
    			else sql+=strList1[i]+"')";
    		}
    		//创建数据库链接
    		Connection conn = DBUtil.getConn();
    		Statement state = null;
    		boolean f = false;
    		int a = 0;
    		
    		try {
    			state = conn.createStatement();
    			a=state.executeUpdate(sql);
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			//关闭连接
    			DBUtil.close(state, conn);
    		}
    		
    		if (a > 0) {
    			f = true;
    		}
    		return f;
    	}
    	
    	
    	
    	
    }
    

      Service层 

    package Service;
    
    
    import java.util.List;
    import java.util.Map;
    
    import Dao.Dao;
    
    
    /**
     * CourseService
     * 服务层
     * @author HDQ
     *
     */
    public class Service {
    
    	Dao dao = new Dao();
    	
    	
    	
    	
    	/**
    	 * 添加T
    	 * @param Class
    	 * @return
    	 */
    	public <T> boolean add(String table,T obj) {
    		boolean f = dao.add(table,obj);
    		return f;
    	}
    	
    	
    	
    	
    }
    

      运行结果:

    但是出现报错

    Table 'hive.data' doesn't exist

     但是我的表确实存在 还饿能够show tables;以及select * from data;

     问题困扰暂未解决

  • 相关阅读:
    C# 让程序自动以管理员身份运行
    [转]SAP算号器 license key Developer Access Key 完美解决方案
    【原创】项目性能优化全纪录(一) 存储过程优化
    treeview的遍历
    .NET求职笔试题目(续)
    SQL server 数据同步 Merge 的一个小bug
    use Stored procedure return a tabel(存储过程返回table)
    四种sql server 数据库分页的测试
    十五个世界最顶级的技术类博客网站
    层的拖动与隐藏
  • 原文地址:https://www.cnblogs.com/zjl-0217/p/11853466.html
Copyright © 2011-2022 走看看