zoukankan      html  css  js  c++  java
  • MapReduce读取hdfs上文件,建立词频的倒排索引到Hbase

    Hdfs上的数据文件为T0,T1,T2(无后缀):

    T0:

    What has come into being in him was life, and the life was the light of all people. 
    The light shines in the darkness, and the darkness did not overcome it. Enter through the narrow gate;
    for the gate is wide and the road is easy that leads to destruction, and there are many who take it.
    For the gate is narrow and the road is hard that leads to life, and there are few who find it

    T1:

    Where, O death, is your victory? Where, O death, is your sting? The sting of death is sin, and.
    The power of sin is the law. But thanks be to God, who gives us the victory through our Lord Jesus Christ.
    The grass withers, the flower fades, when the breath of the LORD blows upon it; surely the people are grass.
    The grass withers, the flower fades; but the word of our God will stand forever.

    T2:

    What has come into being in him was life, and the life was the light of all people. 
    The light shines in the darkness, and the darkness did not overcome it. Enter through the narrow gate;
    for the gate is wide and the road is easy that leads to destruction, and there are many who take it.
    For the gate is narrow and the road is hard that leads to life, and there are few who find it.

    实现代码如下:

    package com.pro.bq;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    
    public class DataFromHdfs {
        public static class LocalMap extends Mapper<Object, Text, Text, Text>
        {
            private FileSplit split=null;
            private Text keydata=null;
            public void map(Object key, Text value,Context context)
                    throws IOException, InterruptedException {
                
                split=(FileSplit) context.getInputSplit();
                StringTokenizer tokenStr=new StringTokenizer(value.toString());
                while(tokenStr.hasMoreTokens())
                {
                    String token=tokenStr.nextToken();
                    if(token.contains(",")|| token.contains(".")||token.contains(";")||token.contains("?"))
                    {
                        token=token.substring(0, token.length()-1);
                    }
                    String filePath=split.getPath().toString();
                    int index=filePath.indexOf("T");
                    keydata=new Text(token+":"+filePath.substring(index));
                    context.write(keydata, new Text("1"));
                }
            }
        }
        public static class LocalCombiner extends Reducer<Text, Text, Text, Text>
        {
    
            public void reduce(Text key, Iterable<Text> values,Context context)
                    throws IOException, InterruptedException {
                int index=key.toString().indexOf(":");
                Text keydata=new Text(key.toString().substring(0, index));
                String filename=key.toString().substring(index+1);
                int sum=0;
                for(Text val:values)
                {
                    sum++;
                }
                context.write(keydata, new Text(filename+":"+String.valueOf(sum)));
            }
        }
        public static class TableReduce extends TableReducer<Text, Text, ImmutableBytesWritable>
        {
    
            public void reduce(Text key, Iterable<Text> values,Context context)
                    throws IOException, InterruptedException {
                for(Text val:values)
                {
                    int index=val.toString().indexOf(":");
                    String filename=val.toString().substring(0, index);
                    int sum=Integer.parseInt(val.toString().substring(index+1));
                    String row=key.toString();
                    Put put=new Put(Bytes.toBytes(key.toString()));
    //                put.add(Bytes.toBytes("word"), Bytes.toBytes("content"), Bytes.toBytes(key.toString()));
                    put.add(Bytes.toBytes("filesum"), Bytes.toBytes("filename"), Bytes.toBytes(filename));
                    put.add(Bytes.toBytes("filesum"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
                    context.write(new ImmutableBytesWritable(Bytes.toBytes(row)), put);
                }
    
            }
        }
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf=new Configuration();
            conf=HBaseConfiguration.create(conf);
            // conf.set("hbase.zookeeper.quorum.", "localhost"); 
            String hdfsPath="hdfs://localhost:9000/user/haduser/";
            String[] argsStr=new String[]{hdfsPath+"input/reverseIndex"};
            String[] otherArgs=new GenericOptionsParser(conf, argsStr).getRemainingArgs();
            Job job=new Job(conf);
            job.setJarByClass(DataFromHdfs.class);
            
            job.setMapperClass(LocalMap.class);
            job.setCombinerClass(LocalCombiner.class);
            job.setReducerClass(TableReduce.class);
            
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);//combiner的输入和输出类型同map相同
            
            //之前要新建"index"表,否则会报错
            TableMapReduceUtil.initTableReducerJob("index", TableReduce.class, job);
            
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }

    运行之前用Shell创建”index“表,命令:” create 'index','filensum'  “

    程序运行之后,再执行shell命令:" scan 'index' ",执行效果如下:

  • 相关阅读:
    Sitecore Digital Marketing System, Part 1: Creating personalized, custom content for site visitors(自定义SiteCore中的 Item的Personalize的Condition) -摘自网络
    Send email alert from Performance Monitor using PowerShell script (检测windows服务器的cpu 硬盘 服务等性能,发email的方法) -摘自网络
    使用Mono Cecil 动态获取运行时数据 (Atribute形式 进行注入 用于写Log) [此文报考 xxx is declared in another module and needs to be imported的解决方法]-摘自网络
    秒杀 ILSpy 等反编译利器 DotNet Resolver
    Nagios:企业级系统监控方案
    C# Asp.net中的AOP框架 Microsoft.CCI, Mono.Cecil, Typemock Open-AOP API, PostSharp -摘自网络 (可以利用反射 Attribute 进行面向切面编程 可以用在记录整个方法的Log方面)
    Windows性能监视器之CPU、硬盘、IO等监控方法详解-摘自网络
    网站防刷方案 -摘自网络
    利用XSD配合XSLT產出特定格式Word檔案 -摘自网络
    asp页面快速找到菜单按钮转向的页面的方法
  • 原文地址:https://www.cnblogs.com/wzyj/p/3565172.html
Copyright © 2011-2022 走看看