zoukankan      html  css  js  c++  java
  • Hadoop读写mysql

    需求

    两张表,一张click表记录某广告某一天的点击量,另一张total_click表记录某广告的总点击量

    建表

    CREATE TABLE `click` (
      `id` int(20) NOT NULL AUTO_INCREMENT,
      `ad_id` int(20) DEFAULT NULL, -- 广告ID
      `click_num` int(30) DEFAULT NULL, -- 某天的点击数量
      `day` date,
      PRIMARY KEY (`id`)
    );
    
    CREATE TABLE `total_click` (
      `id` int(20) NOT NULL AUTO_INCREMENT,
      `ad_id` int(20) DEFAULT NULL, -- 广告ID
      `total_click_num` int(50) DEFAULT NULL, -- 总点击数量
      PRIMARY KEY (`id`)
    )

    pom依赖

    <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.3</version>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
            </dependency>
    
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.45</version>
            </dependency>
        </dependencies>

    代码

    自定义类

    Writable是为了与MapReduce进行对接,而DBWritable是为了与MySQL进行对接。

    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    
    public class MyDBWritable implements DBWritable, Writable {
        private String ad_id;
        private int click_num;
        private int total_click_num;
    
    
        public MyDBWritable(){
    
        }
        public MyDBWritable(String name, int age) {
            this.ad_id = name;
            this.click_num = age;
            this.total_click_num = total_click_num;
        }
    
        public void write(DataOutput out) throws IOException {
            out.writeUTF(ad_id);
            out.writeInt(click_num);
            out.writeInt(total_click_num);
        }
    
        //写数据的过程
        public void write(PreparedStatement statement) throws SQLException {
            //要和SQL_Run类的DBOutputFormat.setOutput(job,"total_click","ad_id","total_click_num")语句里字段的顺序保持一致
            statement.setString(1,ad_id);
            statement.setInt(2, total_click_num);
        }
    
        //读数据的过程
        public void readFields(ResultSet resultSet) throws SQLException {
            ad_id =resultSet.getString(1);
            click_num =resultSet.getInt(2);
        }
    
        public void readFields(DataInput in) throws IOException {
            ad_id =in.readUTF();
            click_num =in.readInt();
            total_click_num =in.readInt();
        }
    
        public String getAd_id() {
            return ad_id;
        }
    
        public void setAd_id(String ad_id) {
            this.ad_id = ad_id;
        }
    
        public int getClick_num() {
            return click_num;
        }
    
        public void setClick_num(int click_num) {
            this.click_num = click_num;
        }
    
        public int getTotal_click_num() {
            return total_click_num;
        }
    
        public void setTotal_click_num(int total_click_num) {
            this.total_click_num = total_click_num;
        }
    
    }

    Map

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    
    public class SQLMapper extends Mapper<LongWritable,MyDBWritable,Text,IntWritable> {
        @Override
        protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
            context.write(new Text(value.getAd_id()),new IntWritable(value.getClick_num()));
        }
    
    }

    Reduce

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    
    public class SQLReducer extends Reducer<Text,IntWritable,MyDBWritable,NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int total = 0;
            for(IntWritable i :values) {
                total+= i.get();
            }
            MyDBWritable myDBWritable = new MyDBWritable();
            myDBWritable.setAd_id(key.toString());
            myDBWritable.setTotal_click_num(total);
            context.write(myDBWritable,NullWritable.get());
        }
    }

    App

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    
    
    public class SQL_Run {
        public static void main(String[] args) throws Exception {
    
            Configuration conf=new Configuration();
    
            //假如是本地测试,需要设置fs.defaultFS
            conf.set("fs.defaultFS","file:///");
    
            Job job = Job.getInstance(conf);
    
    
            FileSystem fs=FileSystem.get(conf);
    
            job.setJobName("SQL_TEST");
            job.setJarByClass(SQL_Run.class);
            job.setMapperClass(SQLMapper.class);
            job.setReducerClass(SQLReducer.class);
    
            //配置数据库信息
            String driveclass="com.mysql.jdbc.Driver";
            String url="jdbc:mysql://192.168.0.8:3306/bigdata";
            String username="root";
            String password="123456";
            DBConfiguration.configureDB(job.getConfiguration(),driveclass,url,username,password);
    
            //设置数据库输入
            //需要通过总的记录数来计算切片
            DBInputFormat.setInput(job,MyDBWritable.class,"select ad_id,click_num from click","select count(id) from click");
    
            //设置数据库输出  //total_click是表名,后面参数是字段值(可以多个)
            DBOutputFormat.setOutput(job,"total_click","ad_id","total_click_num");
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(MyDBWritable.class);
            job.setOutputValueClass(NullWritable.class);
    
            job.waitForCompletion(true);
        }
    }
  • 相关阅读:
    django之数据库orm
    Python的迭代器和生成器
    xhprof 安装使用
    http_load
    sysbench
    LINUX系统下MySQL 压力测试工具super smack
    apache ab工具
    关于流量升高导致TIME_WAIT增加,MySQL连接大量失败的问题
    mysql5.6优化
    php-fpm超时时间设置request_terminate_timeout分析
  • 原文地址:https://www.cnblogs.com/Alcesttt/p/11395235.html
Copyright © 2011-2022 走看看