zoukankan      html  css  js  c++  java
  • Hadoop读写mysql

    需求

    两张表,一张click表记录某广告某一天的点击量,另一张total_click表记录某广告的总点击量

    建表

    CREATE TABLE `click` (
      `id` int(20) NOT NULL AUTO_INCREMENT,
      `ad_id` int(20) DEFAULT NULL, -- 广告ID
      `click_num` int(30) DEFAULT NULL, -- 某天的点击数量
      `day` date,
      PRIMARY KEY (`id`)
    );
    
    CREATE TABLE `total_click` (
      `id` int(20) NOT NULL AUTO_INCREMENT,
      `ad_id` int(20) DEFAULT NULL, -- 广告ID
      `total_click_num` int(50) DEFAULT NULL, -- 总点击数量
      PRIMARY KEY (`id`)
    )

    pom依赖

    <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.3</version>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
            </dependency>
    
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.45</version>
            </dependency>
        </dependencies>

    代码

    自定义类

    Writable是为了与MapReduce进行对接,而DBWritable是为了与MySQL进行对接。

    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    
    public class MyDBWritable implements DBWritable, Writable {
        private String ad_id;
        private int click_num;
        private int total_click_num;
    
    
        public MyDBWritable(){
    
        }
        public MyDBWritable(String name, int age) {
            this.ad_id = name;
            this.click_num = age;
            this.total_click_num = total_click_num;
        }
    
        public void write(DataOutput out) throws IOException {
            out.writeUTF(ad_id);
            out.writeInt(click_num);
            out.writeInt(total_click_num);
        }
    
        //写数据的过程
        public void write(PreparedStatement statement) throws SQLException {
            //要和SQL_Run类的DBOutputFormat.setOutput(job,"total_click","ad_id","total_click_num")语句里字段的顺序保持一致
            statement.setString(1,ad_id);
            statement.setInt(2, total_click_num);
        }
    
        //读数据的过程
        public void readFields(ResultSet resultSet) throws SQLException {
            ad_id =resultSet.getString(1);
            click_num =resultSet.getInt(2);
        }
    
        public void readFields(DataInput in) throws IOException {
            ad_id =in.readUTF();
            click_num =in.readInt();
            total_click_num =in.readInt();
        }
    
        public String getAd_id() {
            return ad_id;
        }
    
        public void setAd_id(String ad_id) {
            this.ad_id = ad_id;
        }
    
        public int getClick_num() {
            return click_num;
        }
    
        public void setClick_num(int click_num) {
            this.click_num = click_num;
        }
    
        public int getTotal_click_num() {
            return total_click_num;
        }
    
        public void setTotal_click_num(int total_click_num) {
            this.total_click_num = total_click_num;
        }
    
    }

    Map

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    
    public class SQLMapper extends Mapper<LongWritable,MyDBWritable,Text,IntWritable> {
        @Override
        protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
            context.write(new Text(value.getAd_id()),new IntWritable(value.getClick_num()));
        }
    
    }

    Reduce

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    
    public class SQLReducer extends Reducer<Text,IntWritable,MyDBWritable,NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int total = 0;
            for(IntWritable i :values) {
                total+= i.get();
            }
            MyDBWritable myDBWritable = new MyDBWritable();
            myDBWritable.setAd_id(key.toString());
            myDBWritable.setTotal_click_num(total);
            context.write(myDBWritable,NullWritable.get());
        }
    }

    App

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    
    
    public class SQL_Run {
        public static void main(String[] args) throws Exception {
    
            Configuration conf=new Configuration();
    
            //假如是本地测试,需要设置fs.defaultFS
            conf.set("fs.defaultFS","file:///");
    
            Job job = Job.getInstance(conf);
    
    
            FileSystem fs=FileSystem.get(conf);
    
            job.setJobName("SQL_TEST");
            job.setJarByClass(SQL_Run.class);
            job.setMapperClass(SQLMapper.class);
            job.setReducerClass(SQLReducer.class);
    
            //配置数据库信息
            String driveclass="com.mysql.jdbc.Driver";
            String url="jdbc:mysql://192.168.0.8:3306/bigdata";
            String username="root";
            String password="123456";
            DBConfiguration.configureDB(job.getConfiguration(),driveclass,url,username,password);
    
            //设置数据库输入
            //需要通过总的记录数来计算切片
            DBInputFormat.setInput(job,MyDBWritable.class,"select ad_id,click_num from click","select count(id) from click");
    
            //设置数据库输出  //total_click是表名,后面参数是字段值(可以多个)
            DBOutputFormat.setOutput(job,"total_click","ad_id","total_click_num");
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(MyDBWritable.class);
            job.setOutputValueClass(NullWritable.class);
    
            job.waitForCompletion(true);
        }
    }
  • 相关阅读:
    CF763C Timofey and Remoduling
    CF762E Radio Stations
    CF762D Maximum Path
    CF763B Timofey and Rectangles
    URAL1696 Salary for Robots
    uva10884 Persephone
    LA4273 Post Offices
    SCU3037 Painting the Balls
    poj3375 Network Connection
    Golang zip压缩文件读写操作
  • 原文地址:https://www.cnblogs.com/Alcesttt/p/11395235.html
Copyright © 2011-2022 走看看