需求
两张表,一张click表记录某广告某一天的点击量,另一张total_click表记录某广告的总点击量
建表
CREATE TABLE `click` ( `id` int(20) NOT NULL AUTO_INCREMENT, `ad_id` int(20) DEFAULT NULL, -- 广告ID `click_num` int(30) DEFAULT NULL, -- 某天的点击数量 `day` date, PRIMARY KEY (`id`) ); CREATE TABLE `total_click` ( `id` int(20) NOT NULL AUTO_INCREMENT, `ad_id` int(20) DEFAULT NULL, -- 广告ID `total_click_num` int(50) DEFAULT NULL, -- 总点击数量 PRIMARY KEY (`id`) )
pom依赖
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.45</version> </dependency> </dependencies>
代码
自定义类
Writable是为了与MapReduce进行对接,而DBWritable是为了与MySQL进行对接。
import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class MyDBWritable implements DBWritable, Writable { private String ad_id; private int click_num; private int total_click_num; public MyDBWritable(){ } public MyDBWritable(String name, int age) { this.ad_id = name; this.click_num = age; this.total_click_num = total_click_num; } public void write(DataOutput out) throws IOException { out.writeUTF(ad_id); out.writeInt(click_num); out.writeInt(total_click_num); } //写数据的过程 public void write(PreparedStatement statement) throws SQLException { //要和SQL_Run类的DBOutputFormat.setOutput(job,"total_click","ad_id","total_click_num")语句里字段的顺序保持一致 statement.setString(1,ad_id); statement.setInt(2, total_click_num); } //读数据的过程 public void readFields(ResultSet resultSet) throws SQLException { ad_id =resultSet.getString(1); click_num =resultSet.getInt(2); } public void readFields(DataInput in) throws IOException { ad_id =in.readUTF(); click_num =in.readInt(); total_click_num =in.readInt(); } public String getAd_id() { return ad_id; } public void setAd_id(String ad_id) { this.ad_id = ad_id; } public int getClick_num() { return click_num; } public void setClick_num(int click_num) { this.click_num = click_num; } public int getTotal_click_num() { return total_click_num; } public void setTotal_click_num(int total_click_num) { this.total_click_num = total_click_num; } }
Map
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SQLMapper extends Mapper<LongWritable,MyDBWritable,Text,IntWritable> { @Override protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException { context.write(new Text(value.getAd_id()),new IntWritable(value.getClick_num())); } }
Reduce
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SQLReducer extends Reducer<Text,IntWritable,MyDBWritable,NullWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int total = 0; for(IntWritable i :values) { total+= i.get(); } MyDBWritable myDBWritable = new MyDBWritable(); myDBWritable.setAd_id(key.toString()); myDBWritable.setTotal_click_num(total); context.write(myDBWritable,NullWritable.get()); } }
App
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; public class SQL_Run { public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); //假如是本地测试,需要设置fs.defaultFS conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); FileSystem fs=FileSystem.get(conf); job.setJobName("SQL_TEST"); job.setJarByClass(SQL_Run.class); job.setMapperClass(SQLMapper.class); job.setReducerClass(SQLReducer.class); //配置数据库信息 String driveclass="com.mysql.jdbc.Driver"; String url="jdbc:mysql://192.168.0.8:3306/bigdata"; String username="root"; String password="123456"; DBConfiguration.configureDB(job.getConfiguration(),driveclass,url,username,password); //设置数据库输入 //需要通过总的记录数来计算切片 DBInputFormat.setInput(job,MyDBWritable.class,"select ad_id,click_num from click","select count(id) from click"); //设置数据库输出 //total_click是表名,后面参数是字段值(可以多个) DBOutputFormat.setOutput(job,"total_click","ad_id","total_click_num"); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(MyDBWritable.class); job.setOutputValueClass(NullWritable.class); job.waitForCompletion(true); } }