项目源码:https://github.com/cw1322311203/hbasedemo/tree/master/hbase-mr-mysql
目标:对HBase中的student表数据的value值进行wordcount,并写入MySQL
前置准备:
-
在maven中加入MySQL jdbc驱动包
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.25</version> </dependency>
-
运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
- 在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。
> CDH版本的放到/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop/lib中
-
首先把包传到集群上:
$ hadoop fs -put mysql-connector-java-5.1.0-bin.jar /hdfsPath/
在mr程序提交job前,添加语句:
DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql-connector-java- 5.1.0-bin.jar”), conf);
-
MySQL中的数据库和表要实现创建好
-
为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
具体代码:
主类
package com.cw.bigdata.mr3;
import com.cw.bigdata.mr3.tool.HBase2MysqlTool;
import org.apache.hadoop.util.ToolRunner;
public class HBase2MysqlApplication {
public static void main(String[] args) throws Exception {
ToolRunner.run(new HBase2MysqlTool(), args);
}
}
Tool类
package com.cw.bigdata.mr3.tool;
import com.cw.bigdata.mr3.bean.CacheData;
import com.cw.bigdata.mr3.format.MysqlOutputFormat;
import com.cw.bigdata.mr3.mapper.ScanHbaseMapper;
import com.cw.bigdata.mr3.reducer.Hbase2MysqlReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;
public class HBase2MysqlTool implements Tool {
public int run(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(HBase2MysqlTool.class);
// mapper
TableMapReduceUtil.initTableMapperJob(
"student",
new Scan(),
ScanHbaseMapper.class,
Text.class,
CacheData.class,
job
);
// reducer
job.setReducerClass(Hbase2MysqlReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(CacheData.class);
job.setOutputFormatClass(MysqlOutputFormat.class);
return job.waitForCompletion(true) ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
}
public void setConf(Configuration configuration) {
}
public Configuration getConf() {
return null;
}
}
Mapper类
package com.cw.bigdata.mr3.mapper;
import com.cw.bigdata.mr3.bean.CacheData;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class ScanHbaseMapper extends TableMapper<Text, CacheData> {
@Override
protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
for (Cell cell : result.rawCells()) {
String name = Bytes.toString(CellUtil.cloneValue(cell));
CacheData data = new CacheData();
data.setName(name);
data.setCount(1);
System.out.println(name);
context.write(new Text(name), data);
}
}
}
Reducer类
package com.cw.bigdata.mr3.reducer;
import com.cw.bigdata.mr3.bean.CacheData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Hbase2MysqlReducer extends Reducer<Text, CacheData, Text, CacheData> {
@Override
protected void reduce(Text key, Iterable<CacheData> datas, Context context) throws IOException, InterruptedException {
int sum = 0;
for (CacheData data : datas) {
sum += data.getCount();
}
CacheData sumData = new CacheData();
sumData.setName(key.toString());
sumData.setCount(sum);
System.err.println(sumData.getName() + ":" + sumData.getCount());
context.write(key, sumData);
}
}
bean对象
package com.cw.bigdata.mr3.bean;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CacheData implements WritableComparable<CacheData> {
private String name;
private int count;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public int compareTo(CacheData data) {
return name.compareTo(data.name);
}
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(count);
}
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
count = in.readInt();
}
}
format类
package com.cw.bigdata.mr3.format;
import com.cw.bigdata.mr3.bean.CacheData;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MysqlOutputFormat extends OutputFormat<Text, CacheData> {
class MysqlRecordWriter extends RecordWriter<Text, CacheData> {
private static final String MYSQL_DRIVER_CLASS = "com.mysql.jdbc.Driver";
//private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/company?useUnicode=true&characterEncoding=UTF-8";
private static final String MYSQL_URL = "jdbc:mysql://192.168.139.101:3306/company?useUnicode=true&characterEncoding=UTF-8";
private static final String MYSQL_USERNAME = "root";
private static final String MYSQL_PASSWORD = "123456";
private Connection connection;
public MysqlRecordWriter() {
try {
Class.forName(MYSQL_DRIVER_CLASS);
connection = DriverManager.getConnection(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
} catch (Exception e) {
e.printStackTrace();
}
}
public void write(Text key, CacheData data) throws IOException, InterruptedException {
String sql = "insert into statresult (name,sumcnt) values(?,?)";
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setObject(1, key.toString());
preparedStatement.setObject(2, data.getCount());
preparedStatement.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
public RecordWriter<Text, CacheData> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new MysqlRecordWriter();
}
public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
}
private FileOutputCommitter committer = null;
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
public static Path getOutputPath(JobContext job) {
String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);
return name == null ? null : new Path(name);
}
}