zoukankan      html  css  js  c++  java
  • 大数据综合案例--搜狗搜索日志分析

    文档连接:

    https://pan.baidu.com/s/1Eq85aWfSUXTCqk5EKo8zPQ

    数据链接:

    https://pan.baidu.com/s/1Y7qQPjBaAvLnnCQPFVvR4Q

    1.数据处理 

    扩展脚本 (年月日)

    vim log-extend.sh

    #! /bin/bash

    #infile=/home/sogou.500w.utf8

    infile=$1

    #outfile=/home/sogou_log.txt

    outfile=$2

    awk -F ' ' '{print $0" "substr($1,0,4)" "substr($1,4,2)" "substr($1,6,2)" "substr($1,8,2)}' $infile > $outfile

    [root@master ~]# bash log-extend.sh sogou.500w.utf8 sogou_log.txt

    过滤脚本(过滤搜索为空)

    Vim log-filter.sh

    #!/bin/bash

    #infile=/home/sogou_log.txt

    infile=$1

    #outfile=/home/sogou_log.txt.flt

    outfile=$2

    awk -F " " '{if($2 != "" && $3 != "" && $2 != " " && $3 != " ") print $0}' $infile > $outfile

    [root@master ~]# bash log-filter.sh sogou_log.txt  sogou_log.txt.flt

           

                  

    1. 上传文件到hdfs

    基于HIve构建日志数据的数据仓库

    创建数据库

    hive> create database sogou;

    使用数据库

    Hive> use sogou;

    创建扩展 4 个字段(年、月、日、小时)数据的外部表:

    hive> CREATE EXTERNAL TABLE sogou_data(

    ts string,

    uid string,

    keyword string,

    rank int,

    sorder int,

    url string,

    year int,

    month int,

    day int,

    hour int)

        > ROW FORMAT DELIMITED

        > FIELDS TERMINATED BY ' '

        > STORED AS TEXTFILE;

    OK

    Time taken: 0.412 seconds

     

     

    Hive表加载数据

    load data inpath '/home/sogou_log.txt.flt' into table sogou_data;

     

     

     

    创建带分区的表:

    hive> CREATE EXTERNAL TABLE sogou_partitioned_data(

    ts string,

    uid string,

    keyword string,

    rank int,

    sorder int,

    url string)

        > PARTITIONED BY(year int,month int,day int,hour int)

        > ROW FORMAT DELIMITED

        > FIELDS TERMINATED BY ' '

        > STORED AS TEXTFILE;

     

     

     

     

     

    设置动态分区

    hive> set hive.exec.dynamic.partition.mode=nonstrict;

    hive> INSERT OVERWRITE TABLE sogou_partitioned_data partition(year,month,day,hour) SELECT * FROM sogou_data;

    查询测试

    Hive> select * from sogou_data limit 10;

    hive> select * from sogou_data limit 10;

    hive> select * from sogou_data where uid='6961d0c97fe93701fc9c0d861d096cd9';

    (1)查询总条数

    hive> select count(*) from sogou_partitioned_data;

    OK

    5000000

    (2)非空查询条数

    hive> select count(*) from sogou_partitioned_data where keyword is not null and keyword!='';

    5000000

    Time taken: 28.606 seconds, Fetched: 1 row(s)

    (3)无重复总条数

    hive> select count(*) from(select count(*) as no_repeat_count from sogou_partitioned_data group by ts,uid,keyword,url having no_repeat_count=1) a;

    OK

    4999272

    Time taken: 101.228 seconds, Fetched: 1 row(s)

    (4)独立UID总数

    hive> select count(distinct(uid)) from sogou_partitioned_data;

    OK

    1352664

    Time taken: 44.639 seconds, Fetched: 1 row(s)

    实现数据分析需求二:关键字分析

    (1)查询频度排名(频度最高的前50词)

    hive> select keyword,count(*)query_count from sogou_partitioned_data group by keyword order by query_count desc limit 50;

    Total MapReduce CPU Time Spent: 1 minutes 4 seconds 510 msec

    OK

    百度 38441

    baidu 18312

    人体艺术 14475

    4399小游戏 11438

    qq空间 10317

    优酷 10158

    新亮剑 9654

    馆陶县县长闫宁的父亲 9127

    公安卖萌 8192

    百度一下 你就知道 7505

    百度一下 7104

    4399 7041

    魏特琳 6665

    qq网名 6149

    7k7k小游戏 5985

    黑狐 5610

    儿子与母亲不正当关系 5496

    新浪微博 5369

    李宇春体 5310

    新疆暴徒被击毙图片 4997

    hao123 4834

    123 4829

    4399洛克王国 4112

    qq头像 4085

    nba 4027

    龙门飞甲 3917

    qq个性签名 3880

    张去死 3848

    cf官网 3729

    凰图腾 3632

    快播 3423

    金陵十三钗 3349

    吞噬星空 3330

    dnf官网 3303

    武动乾坤 3232

    新亮剑全集 3210

    电影 3155

    优酷网 3115

    两次才处决美女罪犯 3106

    电影天堂 3028

    土豆网 2969

    qq分组 2940

    全国各省最低工资标准 2872

    清代姚明 2784

    youku 2783

    争产案 2755

    dnf 2686

    12306 2682

    身份证号码大全 2680

    火影忍者 2604

    Time taken: 119.195 seconds, Fetched: 50 row(s)

    实现数据分析需求三:UID分析

    (1)查询次数大于2次的用户总数

    hive> select count(*) from(select count(*) as query_count  from sogou_partitioned_data group by uid having query_count > 2) a;

    OK

    546353

    Time taken: 69.837 seconds, Fetched: 1 row(s)

    (2)查询次数大于2次的用户占比

    A:

    hive> select count(*) from(select count(*) as query_count  from sogou_partitioned_data group by uid having query_count > 2) a;

    OK

    546353

    Time taken: 69.837 seconds, Fetched: 1 row(s)

    B

    hive> select count(distinct(uid)) from sogou_partitioned_data;

    OK

    1352664

    A/B

    hive> select 546353/1352664;

    OK

    0.40390887907122536

    Time taken: 0.255 seconds, Fetched: 1 row(s)

    (3) rank次数在10以内的点击次数占比(rank既是第四列的内容)

    A:

    hive> select count(*) from sogou_partitioned_data where rank < 11;

    4999869

    Time taken: 29.653 seconds, Fetched: 1 row(s)

    B:

    hive> select count(*) from sogou_partitioned_data;

    5000000

    A/B

    hive> select 4999869/5000000;

    OK

    0.9999738

    (4) 直接输入URL查询的比例

    A:

    hive> select count(*) from sogou_partitioned_data where keyword like '%www%';

    OK

    73979

    B:

    hive> select count(*) from sogou_partitioned_data;

    OK

    5000000

    A/B

    hive> select 73979/5000000;

    OK

    0.0147958

    实现数据分析需求四:独立用户行为分析

    (1)查询搜索过仙剑奇侠传uid,并且次数大于3

    hive> select uid,count(*) as cnt from sogou_partitioned_data where keyword='仙剑奇侠传' group by uid having cnt > 3;

    653d48aa356d5111ac0e59f9fe736429 6

    e11c6273e337c1d1032229f1b2321a75 5

    Time taken: 30.732 seconds, Fetched: 2 row(s)

    5.1查询总条数

    QueryTotalNumber.java

    package com.sogou;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class QueryTotalNumber extends Configured implements Tool {

    public static class QueryTotalNumberMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("QueryTotalNumber");

    private LongWritable ovalue=new LongWritable(1L);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    if(!"".equals(line)) {

    context.write(okey, ovalue);

    }

    }

    }

    public static class QueryTotalNumberReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context)

    throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

            //远程调试必须加上

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job=Job.getInstance(conf,"SogouLogCount");

    job.setJarByClass(QueryTotalNumber.class);

    FileInputFormat.addInputPath(job, new Path("/sougou/sogou_log.txt.flt"));

    job.setMapperClass(QueryTotalNumberMapper.class);

    job.setReducerClass(QueryTotalNumberReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job, new Path("/output/1_QueryTotalNumber"));

    return job.waitForCompletion(true)? 0:1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new QueryTotalNumber(), args);

    System.exit(res);

    }

    }

    5.2非空查询条数

    package com.sogou;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class NotNullQueryTotalNumber extends Configured implements Tool {

    public static class NotNullQueryTotalNumberMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("NotNullQueryTotalNumber");

    private LongWritable ovalue=new LongWritable(1L);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    String keyword=lineSplited[2];

    if((!"".equals(lineSplited) || lineSplited!=null)

    && (!"".equals(keyword) || keyword!=null)) {

    context.write(okey, ovalue);

    }

    }

    }

    public static class NotNullQueryTotalNumberReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job=Job.getInstance(conf);

    job.setJarByClass(NotNullQueryTotalNumber.class);

    FileInputFormat.addInputPath(job, new Path("/sougou/sogou_log.txt.flt"));

    job.setMapperClass(NotNullQueryTotalNumberMapper.class);

    job.setReducerClass(NotNullQueryTotalNumberReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job, new Path("/output/2_NotNullQueryTotalNumber"));

    return job.waitForCompletion(true)? 0:1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new NotNullQueryTotalNumber(), args);

    System.exit(res);

    }

    }

    5.3无重复总条数

    package com.sogou;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class NotRepeatQueryTotalNumber extends Configured implements Tool {

    public static class NotRepeatQueryTotalNumberMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text();

    private LongWritable ovalue=new LongWritable(1L);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    StringBuffer sb=new StringBuffer();

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    sb.append(lineSplited[0]).append("_")

    .append(lineSplited[1]).append("_")

    .append(lineSplited[2]).append("_")

    .append(lineSplited[5]);

    okey.set(sb.toString());

    context.write(okey, ovalue);

    }

    }

    public static class NotRepeatQueryTotalNumberReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    public static class NotRepeatQueryTotalNumberMapper2 extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("NotRepeatQueryTotalNumber");

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String[] splited=value.toString().split(" ");

    long count=Long.valueOf(splited[1]);

    if(count==1) {

    ovalue.set(count);

    context.write(okey, ovalue);

    }

    }

    }

    public static class NotRepeatQueryTotalNumberReducer2 extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job1=Job.getInstance(conf);

    job1.setJarByClass(NotRepeatQueryTotalNumber.class);

    FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

    job1.setMapperClass(NotRepeatQueryTotalNumberMapper.class);

    job1.setReducerClass(NotRepeatQueryTotalNumberReducer.class);

    job1.setOutputKeyClass(Text.class);

    job1.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_notrepeat"));

    job1.waitForCompletion(true);

    Job job2=Job.getInstance(conf);

    job2.setJarByClass(NotRepeatQueryTotalNumber.class);

    FileInputFormat.addInputPath(job2, new Path("/outdata/sogou_notrepeat"));

    job2.setMapperClass(NotRepeatQueryTotalNumberMapper2.class);

    job2.setReducerClass(NotRepeatQueryTotalNumberReducer2.class);

    job2.setOutputKeyClass(Text.class);

    job2.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job2, new Path("/output/3_NotRepeatQueryTotalNumber"));

    return job2.waitForCompletion(true)? 0:1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new NotRepeatQueryTotalNumber(), args);

    System.exit(res);

    }

    }

     

    5.4独立UID总数

    package com.sogou;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class IndependentUID extends Configured implements Tool {

    public static class IndependentUIDMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text();

    private LongWritable ovalue=new LongWritable(1L);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    String uid=lineSplited[1];

    if(!"".equals(uid) || uid!=null) {

    okey.set(uid);

    context.write(okey, ovalue);

    }

    }

    }

    public static class IndependentUIDReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    public static class IndependentUIDMapper2 extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("independentUID");

    private LongWritable ovalue=new LongWritable(1L);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    long count=Long.valueOf(lineSplited[1]);

    if(count >=1) {

    context.write(okey, ovalue);

    }

    }

    }

    public static class IndependentUIDReducer2 extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job1=Job.getInstance(conf);

    job1.setJarByClass(IndependentUID.class);

    FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

    job1.setMapperClass(IndependentUIDMapper.class);

    job1.setReducerClass(IndependentUIDReducer.class);

    job1.setOutputKeyClass(Text.class);

    job1.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_independentUID"));

    job1.waitForCompletion(true);

    Job job2=Job.getInstance(conf);

    job2.setJarByClass(IndependentUID.class);

    FileInputFormat.addInputPath(job2, new Path("/outdata/sogou_independentUID"));

    job2.setMapperClass(IndependentUIDMapper2.class);

    job2.setReducerClass(IndependentUIDReducer2.class);

    job2.setOutputKeyClass(Text.class);

    job2.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job2, new Path("/output/4_IndependentUID"));

    return job2.waitForCompletion(true)? 0:1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new IndependentUID(), args);

    System.exit(res);

    }

    }

     

    5.5查询频度排名(频度最高的前50词)

    package com.sogou;

    import java.io.IOException;

    import java.util.Comparator;

    import java.util.Map;

    import java.util.TreeMap;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class QueryFreRankTop50 extends Configured implements Tool {

    public static class QueryFreRankMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text();

    private LongWritable ovalue=new LongWritable(1L);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    String keyword=lineSplited[2];

    if(!"".equals(keyword) || keyword!=null) {

    okey.set(keyword);

    context.write(okey, ovalue);

    }

    }

    }

    public static class QueryFreRankReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    public static class Top50Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{

    private static final int K=50;

    private TreeMap<Long, String> tm=new TreeMap<Long, String>();

    private LongWritable okey=new LongWritable();

    private Text ovalue=new Text();

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    String keyword=lineSplited[0];

    long count=Long.valueOf(lineSplited[1].trim());

    tm.put(count, keyword);

    if(tm.size() > K) {

    tm.remove(tm.firstKey());

    }

    }

    @Override

    protected void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context)

    throws IOException, InterruptedException {

    for(Map.Entry<Long,String> entry:tm.entrySet()) {

    long count=entry.getKey();

    String keyword=entry.getValue();

    okey.set(count);

    ovalue.set(keyword);

    context.write(okey, ovalue);

    }

    }

    }

    public static class Top50Reducer extends Reducer<LongWritable, Text, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    private Text okey=new Text();

    private static final int K=50;

    private TreeMap<Long, String> tm=new TreeMap<Long, String>(new Comparator<Long>() {

    @Override

    public int compare(Long o1, Long o2) {

    return o2.compareTo(o1);

    }

    });

    @Override

    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

    for(Text value:values) {

    tm.put(key.get(), value.toString());

    if(tm.size() > K) {

    tm.remove(tm.firstKey());

    }

    }

    }

    @Override

    protected void cleanup(Reducer<LongWritable, Text, Text, LongWritable>.Context context)

    throws IOException, InterruptedException {

    for(Map.Entry<Long, String> entry:tm.entrySet()) {

    String keyword=entry.getValue();

    long count=entry.getKey();

    okey.set(keyword);

    ovalue.set(count);

    context.write(okey, ovalue);

    }

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job1=Job.getInstance(conf);

    job1.setJarByClass(QueryFreRankTop50.class);

    FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

    job1.setMapperClass(QueryFreRankMapper.class);

    job1.setReducerClass(QueryFreRankReducer.class);

    job1.setOutputKeyClass(Text.class);

    job1.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_queryFreRank"));

    job1.waitForCompletion(true);

    Job job2=Job.getInstance(conf);

    job2.setJarByClass(QueryFreRankTop50.class);

    FileInputFormat.addInputPath(job2, new Path("/outdata/sogou_queryFreRank"));

    job2.setMapperClass(Top50Mapper.class);

    job2.setMapOutputKeyClass(LongWritable.class);

    job2.setMapOutputValueClass(Text.class);

    job2.setReducerClass(Top50Reducer.class);

    job2.setOutputKeyClass(Text.class);

    job2.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job2, new Path("/output/5_QueryFreRankTop50"));

    return job2.waitForCompletion(true)? 0:1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new QueryFreRankTop50(), args);

    System.exit(res);

    }

    }

    5.6查询次数大于2次的用户总数

    package com.sogou;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class QueriesGreaterThan2 extends Configured implements Tool {

    public static class NumQueGreTwoMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text();

    private LongWritable ovalue=new LongWritable(1);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    String uid=lineSplited[1];

    if(uid != null || !"".equals(uid)) {

    okey.set(uid);

    context.write(okey, ovalue);

    }

    }

    }

    public static class NumQueGreTwoReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    if(sum >2) {

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    }

    public static class NumQueGreTwoToOneMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("NumQueGreTwo");

    private LongWritable ovalue=new LongWritable(1);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    long count=Long.valueOf(lineSplited[1]);

    if(count > 2) {

    context.write(okey, ovalue);

    }

    }

    }

    public static class NumQueGreTwoToOneReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum =0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job1=Job.getInstance(conf);

    job1.setJarByClass(QueriesGreaterThan2.class);

    FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

    job1.setMapperClass(NumQueGreTwoMapper.class);

    job1.setReducerClass(NumQueGreTwoReducer.class);

    job1.setOutputKeyClass(Text.class);

    job1.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_numQueGreTwo"));

    job1.waitForCompletion(true);

    Job job2=Job.getInstance(conf);

    job2.setJarByClass(QueriesGreaterThan2.class);

    FileInputFormat.addInputPath(job2, new Path("/outdata/sogou_numQueGreTwo"));

    job2.setMapperClass(NumQueGreTwoToOneMapper.class);

    job2.setReducerClass(NumQueGreTwoToOneReducer.class);

    job2.setOutputKeyClass(Text.class);

    job2.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job2, new Path("/output/6_QueriesGreaterThan2"));

    return job2.waitForCompletion(true)? 0:1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new QueriesGreaterThan2(), args);

    System.exit(res);

    }

    }

    5.7查询次数大于2次的用户占比

    package com.sogou;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.DoubleWritable;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    //import org.apache.hadoop.mapreduce.Partitioner;

    import org.apache.hadoop.mapreduce.Reducer;

    //import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class RatioOfQueriesGreaterThan2 extends Configured implements Tool {

    public static class UserDutyThanTwoMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("userDutyThanTwn");

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    long count=Long.parseLong(lineSplited[1]);

    ovalue.set(count);

    context.write(okey, ovalue);

    }

    }

    public static class UserDutyThanTwoReducere extends Reducer<Text, LongWritable, Text, DoubleWritable>{

    private Text okey=new Text("userDutyThanTwn");

    private DoubleWritable percent=new DoubleWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context)

    throws IOException, InterruptedException {

    StringBuffer buffer=new StringBuffer();

    for(LongWritable value:values) {

    buffer.append(value).append(",");

    }

    String[] moleculeOrDenominator=buffer.toString().split(",");

    double a=Double.valueOf(moleculeOrDenominator[0]);

    double b=Double.valueOf(moleculeOrDenominator[1]);

    double per=0.0;

    if(a<=b) {

    per=a/b;

    }else {

    per=b/a;

    }

    percent.set(per);

    context.write(okey, percent);

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job1=Job.getInstance(conf);

    job1.setJarByClass(RatioOfQueriesGreaterThan2.class);

    MultipleInputs.addInputPath(job1, new Path("/output/4_IndependentUID"),

    TextInputFormat.class, UserDutyThanTwoMapper.class);

    MultipleInputs.addInputPath(job1, new Path("/output/6_QueriesGreaterThan2"),

    TextInputFormat.class, UserDutyThanTwoMapper.class);

    job1.setMapOutputKeyClass(Text.class);

    job1.setMapOutputValueClass(LongWritable.class);

    job1.setReducerClass(UserDutyThanTwoReducere.class);

    job1.setOutputKeyClass(Text.class);

    job1.setOutputValueClass(DoubleWritable.class);

    FileOutputFormat.setOutputPath(job1, new Path("/output/7_RatioOfQueriesGreaterThan2"));

    return job1.waitForCompletion(true)? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new RatioOfQueriesGreaterThan2(), args);

    System.exit(res);

    }

    }

    5.8rank次数在10以内的点击次数占比(rank既是第四列的内容)

    package com.sogou;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.DoubleWritable;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class RatioOfClickTimesInTen extends Configured implements Tool {

    public static class NumberOfLessTenMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("numberOfRankTen");

    private LongWritable ovalue=new LongWritable(1L);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    int rank=Integer.parseInt(lineSplited[3]);

    if(rank < 11) {

    context.write(okey, ovalue);

    }

    }

    }

    public static class NumberOfRankTenMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("numberOfRankTen");

    private LongWritable ovalue=new LongWritable(1L);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    int rank=Integer.parseInt(lineSplited[3]);

    if(rank >= 0) {

    context.write(okey, ovalue);

    }

    }

    }

    public static class NumberOfRankTenReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    public static class UserDutyThanTwoMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text();

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    String word=lineSplited[0];

    long count=Long.parseLong(lineSplited[1]);

    okey.set(word);

    ovalue.set(count);

    context.write(okey, ovalue);

    }

    }

    public static class UserDutyThanTwoReducere extends Reducer<Text, LongWritable, Text, DoubleWritable>{

    private DoubleWritable percent=new DoubleWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context)

    throws IOException, InterruptedException {

    StringBuffer buffer=new StringBuffer();

    for(LongWritable value:values) {

    buffer.append(value).append(",");

    }

    String[] moleculeOrDenominator=buffer.toString().split(",");

    double a=Double.valueOf(moleculeOrDenominator[0]);

    double b=Double.valueOf(moleculeOrDenominator[1]);

    double per=0.0;

    if(a<=b) {

    per=a/b;

    }else {

    per=b/a;

    }

    percent.set(per);

    context.write(key, percent);

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job1=Job.getInstance(conf);

    job1.setJarByClass(RatioOfClickTimesInTen.class);

    FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

    job1.setMapperClass(NumberOfLessTenMapper.class);

    job1.setReducerClass(NumberOfRankTenReducer.class);

    job1.setOutputKeyClass(Text.class);

    job1.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_numberOfLessTen"));

    job1.waitForCompletion(true);

    Job job2=Job.getInstance(conf);

    job2.setJarByClass(RatioOfClickTimesInTen.class);

    FileInputFormat.addInputPath(job2, new Path("/sougou/sogou_log.txt.flt"));

    job2.setMapperClass(NumberOfRankTenMapper.class);

    job2.setReducerClass(NumberOfRankTenReducer.class);

    job2.setOutputKeyClass(Text.class);

    job2.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job2, new Path("/outdata/sogou_numberOfRankTen"));

    job2.waitForCompletion(true);

    Job job3=Job.getInstance(conf);

    job3.setJarByClass(RatioOfClickTimesInTen.class);

    MultipleInputs.addInputPath(job3, new Path("/outdata/sogou_numberOfLessTen"),

    TextInputFormat.class, UserDutyThanTwoMapper.class);

    MultipleInputs.addInputPath(job3, new Path("/outdata/sogou_numberOfRankTen"),

    TextInputFormat.class, UserDutyThanTwoMapper.class);

    job3.setMapOutputKeyClass(Text.class);

    job3.setMapOutputValueClass(LongWritable.class);

    job3.setReducerClass(UserDutyThanTwoReducere.class);

    job3.setOutputKeyClass(Text.class);

    job3.setOutputValueClass(DoubleWritable.class);

    FileOutputFormat.setOutputPath(job3, new Path("/output/8_RatioOfClickTimesInTen"));

    return job3.waitForCompletion(true)? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new RatioOfClickTimesInTen(), args);

    System.exit(res);

    }

    }

    5.9直接输入URL查询的比例

    package com.sogou;

    import java.io.IOException;

    import java.util.regex.Pattern;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.DoubleWritable;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class RatioOfDirectInputURL extends Configured implements Tool {

    public static class RatioOfDirectInputURLMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("SubInputURLPerMapper");

    private LongWritable ovalue=new LongWritable(1L);

    String pattern=".*www.*";

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    String keyword=lineSplited[2];

    if(Pattern.matches(pattern, keyword)) {

    context.write(okey, ovalue);

    }

    }

    }

    public static class RatioOfDirectInputURLReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum=0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    public static class UserDutyThanTwoMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text("subInputURLPer");

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    @SuppressWarnings("unused")

    String word=lineSplited[0];

    long count=Long.parseLong(lineSplited[1]);

    ovalue.set(count);

    context.write(okey, ovalue);

    }

    }

    public static class UserDutyThanTwoReducere extends Reducer<Text, LongWritable, Text, DoubleWritable>{

    private DoubleWritable percent=new DoubleWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context)

    throws IOException, InterruptedException {

    StringBuffer buffer=new StringBuffer();

    for(LongWritable value:values) {

    buffer.append(value).append(",");

    }

    String[] moleculeOrDenominator=buffer.toString().split(",");

    double a=Double.valueOf(moleculeOrDenominator[0]);

    double b=Double.valueOf(moleculeOrDenominator[1]);

    double per=0.0;

    if(a<=b) {

    per=a/b;

    }else {

    per=b/a;

    }

    percent.set(per);

    context.write(key, percent);

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job1=Job.getInstance(conf);

    job1.setJarByClass(RatioOfDirectInputURL.class);

    FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

    job1.setMapperClass(RatioOfDirectInputURLMapper.class);

    job1.setReducerClass(RatioOfDirectInputURLReducer.class);

    job1.setOutputKeyClass(Text.class);

    job1.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_subInputURLPer"));

    job1.waitForCompletion(true);

    Job job2=Job.getInstance(conf);

    job2.setJarByClass(RatioOfDirectInputURL.class);

    MultipleInputs.addInputPath(job2, new Path("/outdata/sogou_subInputURLPer"),

    TextInputFormat.class, UserDutyThanTwoMapper.class);

    MultipleInputs.addInputPath(job2, new Path("/outdata/sogou_numberOfRankTen"),

    TextInputFormat.class, UserDutyThanTwoMapper.class);

    job2.setMapOutputKeyClass(Text.class);

    job2.setMapOutputValueClass(LongWritable.class);

    job2.setReducerClass(UserDutyThanTwoReducere.class);

    job2.setOutputKeyClass(Text.class);

    job2.setOutputValueClass(DoubleWritable.class);

    FileOutputFormat.setOutputPath(job2, new Path("/output/9_RatioOfDirectInputURL"));

    return job2.waitForCompletion(true)? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new RatioOfDirectInputURL(), args);

    System.exit(res);

    }

    }

    5.10查询搜索过”仙剑奇侠传“的uid,并且次数大于3

    package com.sogou;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.conf.Configured;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.Tool;

    import org.apache.hadoop.util.ToolRunner;

    public class QuerySearch extends Configured implements Tool {

    public static class QuerySearchMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    private Text okey=new Text();

    private LongWritable ovalue=new LongWritable(1L);

    @Override

    protected void map(LongWritable key, Text value, Context context)

    throws IOException, InterruptedException {

    String line=value.toString();

    String[] lineSplited=line.split(" ");

    String uid=lineSplited[1];

    String keyword=lineSplited[2];

    if(keyword.equals("仙剑奇侠传")) {

    String uid_keyword=uid+"_"+keyword;

    okey.set(uid_keyword);

    context.write(okey, ovalue);

    }

    }

    }

    public static class QuerySearchReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    private LongWritable ovalue=new LongWritable();

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

    long sum =0;

    for(LongWritable value:values) {

    sum +=value.get();

    }

    if(sum > 3) {

    ovalue.set(sum);

    context.write(key, ovalue);

    }

    }

    }

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf=new Configuration();

    conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

    Job job=Job.getInstance(conf);

    job.setJarByClass(QuerySearch.class);

    FileInputFormat.addInputPath(job, new Path("/sougou/sogou_log.txt.flt"));

    job.setMapperClass(QuerySearchMapper.class);

    job.setReducerClass(QuerySearchReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job, new Path("/output/10_QuerySearch"));

    return job.waitForCompletion(true)? 0:1;

    }

    public static void main(String[] args) throws Exception {

    int res=ToolRunner.run(new QuerySearch(), args);

    System.exit(res);

    }

    }

    6.生成的文件通过Java API方式导入到HBase(一张表)。

    package com.sogou;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.hbase.HBaseConfiguration;

    import org.apache.hadoop.hbase.client.Mutation;

    import org.apache.hadoop.hbase.client.Put;

    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;

    import org.apache.hadoop.hbase.mapreduce.TableReducer;

    import org.apache.hadoop.hbase.util.Bytes;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.NullWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

    public class Hbase_Import {

    // reduce输出的表名

        // private static String tableName = "sogou_data_analysis_results_table";

    private static String tableName = "sogou_data";

    // 初始化连接

    static Configuration conf = null;

    static {

    conf = HBaseConfiguration.create();

    conf.set("hbase.rootdir", "hdfs://10.49.23.127:9000/hbase");

    conf.set("hbase.master", "hdfs://10.49.23.127:60000");

    conf.set("hbase.zookeeper.property.clientPort", "2181");

    conf.set("hbase.zookeeper.quorum", "10.49.23.127,10.49.23.134,10.49.23.129");

    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);

    // conf.set("dfs.socket.timeout", "180000");

    }

    public static class BatchMapper extends

    Mapper<LongWritable, Text, LongWritable, Text> {

    protected void map(LongWritable key, Text value,

    Mapper<LongWritable, Text, LongWritable, Text>.Context context)

    throws IOException, InterruptedException {

    String line = value.toString();

    Text v2s = new Text();

    v2s.set(line);

    context.write(key, v2s);

    }

    }

    public static class BatchReducer extends

    TableReducer<LongWritable, Text, NullWritable> {

    private String family = "info";

    @Override

    protected void reduce(

    LongWritable arg0,

    Iterable<Text> v2s,

    Reducer<LongWritable, Text, NullWritable, Mutation>.Context context)

    throws IOException, InterruptedException {

    for (Text v2 : v2s) {

    String[] splited = v2.toString().split(" ");

    String rowKey = splited[0];

    Put put = new Put(rowKey.getBytes());

    // put.addColumn(family.getBytes(), "raw".getBytes(), v2.toString().getBytes());

    put.addColumn(Bytes.toBytes(family), Bytes.toBytes("raw"), Bytes.toBytes(v2.toString()));

    context.write(NullWritable.get(), put);

    }

    // for (Text v2 : v2s) {

    // String[] splited = v2.toString().split(" ");

    // String rowKey = splited[0];

    // Put put = new Put(Bytes.toBytes("rowkey"));

    //// put.addColumn(family.getBytes(), "raw".getBytes(), v2.toString().getBytes());

    //

    // put.addColumn(Bytes.toString(family), Bytes.toBytes("raw"), Bytes.toBytes(v2.toString()));

    // context.write(NullWritable.get(), put);

    // }

    }

    public static void imputil(String str) throws IOException, ClassNotFoundException,

    InterruptedException {

    Job job = Job.getInstance(conf, Hbase_Import.class.getSimpleName());

    TableMapReduceUtil.addDependencyJars(job);

    job.setJarByClass(Hbase_Import.class);

    FileInputFormat.setInputPaths(job,str);

    job.setInputFormatClass(TextInputFormat.class);

    job.setMapperClass(BatchMapper.class);

    job.setMapOutputKeyClass(LongWritable.class);

    job.setMapOutputValueClass(Text.class);

    job.setReducerClass(BatchReducer.class);

    job.setOutputFormatClass(TableOutputFormat.class);

    job.waitForCompletion(true);

    }

    public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

    String[] str={

    "hdfs://10.49.23.127:9000/output/1_QueryTotalNumber",

    "hdfs://10.49.23.127:9000/output/2_NotNullQueryTotalNumber",

    "hdfs://10.49.23.127:9000/output/3_NotRepeatQueryTotalNumber",

    "hdfs://10.49.23.127:9000/output/4_IndependentUID",

    "hdfs://10.49.23.127:9000/output/5_QueryFreRankTop50",

    "hdfs://10.49.23.127:9000/output/6_QueriesGreaterThan2",

    "hdfs://10.49.23.127:9000/output/7_RatioOfQueriesGreaterThan2",

    "hdfs://10.49.23.127:9000/output/8_RatioOfClickTimesInTen",

    "hdfs://10.49.23.127:9000/output/9_RatioOfDirectInputURL",

    "hdfs://10.49.23.127:9000/output/10_QuerySearch"};

    for (String stri:str){

    imputil(stri);

    }

    }

     }

    }

    multipleinputs新旧jar包问题

    https://stackoverflow.com/questions/26434790/multipleinputs-not-working-hadoop-2-5-0

    Java编码问题

    https://blog.csdn.net/u011597415/article/details/53506574

  • 相关阅读:
    mybatis入门
    windows环境下搭建RocketMQ
    主键-雪花算法
    Springboot杂七杂八
    springboot整合webSocket的使用
    sss
    sss
    sss
    sss
    sss
  • 原文地址:https://www.cnblogs.com/mzc1997/p/9200507.html
Copyright © 2011-2022 走看看