zoukankan      html  css  js  c++  java
  • mapreduce从数据库中分析数据,并把分析结果写入数据库中

    创建类

    package myTest;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.lib.db.DBWritable;
    
    public class DBRecord implements Writable, DBWritable{
        private String title;
        private String author;
        private long numberRead;
        private String url;
        
        public DBRecord() {
            
        }
        
        public DBRecord(String title, String author, long numberRead, String url) {
            this.title = title;
            this.author = author;
            this.numberRead = numberRead;
            this.url = url;
        }
        public String getTitle() {
            return title;
        }
        
        public void setTitle(String title) {
            this.title = title;
        }
        
        public String getAuthor() {
            return author;
        }
        
        public void setAuthor(String author) {
            this.author = author;
        }
        
        public long getNumberRead() {
            return numberRead;
        }
        
        public void setNumberRead(long numberRead) {
            this.numberRead = numberRead;
        }
        
        public String getUrl() {
            return url;
        }
        
        public void setUrl(String url) {
            this.url = url;
        }
        
    
        @Override
        public void readFields(ResultSet set) throws SQLException {
            this.title = set.getString("title");
            this.author = set.getString("author");
            this.numberRead = set.getLong("numberRead");
            this.url = set.getString("url");
        }
    
        @Override
        public void write(PreparedStatement pst) throws SQLException {
            pst.setString(1, title);
            pst.setString(2, author);
            pst.setLong(3, numberRead);
            pst.setString(4, url);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.title = Text.readString(in);
            this.author = Text.readString(in);
            this.numberRead = in.readLong();
            this.url = Text.readString(in);
            
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            
            Text.writeString(out, this.title);
            Text.writeString(out, this.author);
            out.writeLong(this.numberRead);
            Text.writeString(out, this.url);
            
        }
    
        @Override
        public String toString() {
             return "title: " + this.title + " author: " + this.author + " numberRead: " + this.numberRead + " url: " + this.url;  
        }
    }

    创建类

    package flowsum;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    
    public class FlowBean implements WritableComparable<FlowBean>, Writable, DBWritable {
    
        private String userName;
        private long numberRead;
        private long numberArticle;
        
        // 在反序列时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
        public  FlowBean() {}
        
        public FlowBean(String userName, long numberRead, long numberArticle) {
            this.userName = userName;
            this.numberRead = numberRead;
            this.numberArticle = numberArticle;
        }
        
        public String getUserName() {
            return userName;
        }
        
        public void setUserName(String userName) {
            this.userName = userName;
        }
        
        public long getNumberRead() {
            return numberRead;
        }
        
        public void setNumberRead(long numberRead) {
            this.numberRead = numberRead;
        }
        
        public long getNumberArticle() {
            return numberArticle;
        }
        
        public void setNumberArticle(long numberArticle) {
            this.numberArticle = numberArticle;
        }
        
        
        // 从数据流中反序列出对象的数据
        // 从数据流中独处对象字段事,必须和序列化时的顺序保持一致
        @Override
        public void readFields(DataInput arg0) throws IOException {
            userName = arg0.readUTF();
            numberRead = arg0.readLong();
            numberArticle = arg0.readLong();
        }
        
        // 将对象数据序列化到流中
        @Override
        public void write(DataOutput arg0) throws IOException {
            arg0.writeUTF(userName);
            arg0.writeLong(numberRead);
            arg0.writeLong(numberArticle);
        }
        @Override
        public int compareTo(FlowBean o) {
            return numberRead > o.getNumberRead() ? -1 : 1;
        }
        
        
        @Override
        public String toString() {
            return userName + "	" + numberRead + "	" + numberArticle;
        }
    
        @Override
        public void readFields(ResultSet arg0) throws SQLException {
            this.userName = arg0.getString(1);
            this.numberRead = arg0.getLong(2);
            this.numberArticle = arg0.getLong(3);
            
        }
    
        @Override
        public void write(PreparedStatement arg0) throws SQLException {
            arg0.setString(1, this.userName);
            arg0.setLong(2, this.numberRead);
            arg0.setLong(3, this.numberArticle);
        }
        
    }
    package student;
    
    import java.io.IOException;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.lib.db.DBInputFormat;
    import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    
    import flowsum.FlowBean;
    import myTest.DBRecord;
    
    public class SunDB {
    
        public static class SunMapper extends Mapper<LongWritable, DBRecord, Text, FlowBean> { 
            @Override
            protected void map(LongWritable key, DBRecord value,
                    Mapper<LongWritable, DBRecord, Text, FlowBean>.Context context)
                    throws IOException, InterruptedException {
                String userName = value.getAuthor();
                long numberRead = value.getNumberRead();
                FlowBean bean = new FlowBean(userName, numberRead, 1);
                context.write(new Text(userName), bean);
                System.out.println(bean);
            }
        }
        
        public static class SunReducer extends Reducer<Text, FlowBean, FlowBean, Text> {
            @Override
            protected void reduce(Text arg0, Iterable<FlowBean> arg1, Reducer<Text, FlowBean, FlowBean, Text>.Context arg2)
                    throws IOException, InterruptedException {
                long numberReadCount = 0;
                long numberArticleCount = 0;
                for(FlowBean bean : arg1) {
                    numberReadCount +=bean.getNumberRead();
                    numberArticleCount +=bean.getNumberArticle();
                }
                FlowBean bean = new FlowBean(arg0.toString(), numberReadCount, numberArticleCount);
                arg2.write(bean, new Text());
                System.out.println(bean);
            }
        }
        
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration configuration = new Configuration();
             DBConfiguration.configureDB(configuration,"com.mysql.jdbc.Driver", "jdbc:mysql://127.0.0.1:3306/sun_test??characterEncoding=utf8&useSSL=false","root","Sun@123");
             String [] fields = {"title", "author", "numberRead", "url"};
             final Job job = new Job(configuration, "yaya");
             job.setJarByClass(SunDB.class);
             job.setInputFormatClass(DBInputFormat.class);
             DBInputFormat.setInput(job, DBRecord.class, "sourcess", null, null, fields);
             job.setMapperClass(SunMapper.class);
             job.setReducerClass(SunReducer.class);
             
             job.setNumReduceTasks(1);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(FlowBean.class);
             DBOutputFormat.setOutput(job, "flow","userName","numberRead", "numberArticle");
             job.waitForCompletion(true);
        }
    }
  • 相关阅读:
    Java笔记(06):如何使用JDK提供的帮助文档
    Java笔记(05):面向对象--继承
    MySql:基本SQL
    Oracle:简单SQL程序、存储过程、触发器
    Oracle:批量操作、视图、序列、简单SQL程序
    力扣(LeetCode)两整数之和 个人题解
    力扣(LeetCode)买卖股票的最佳时机 个人题解
    力扣(LeetCode)环形链表 个人题解
    力扣(LeetCode)找不同 个人题解
    力扣(LeetCode)从不订购的客户-数据库题 个人题解
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6362828.html
Copyright © 2011-2022 走看看