zoukankan      html  css  js  c++  java
  • MapReduce Design Patterns(7、输入输出模式)(十三)

    http://blog.csdn.net/cuirong1986/article/details/8510162


    CHAPTER 7.Input and Output Patterns


    本章关注一个最经常忽略的问题,来改进MapReduce value:自定义输入和输出。我们并不会总使用Mapreduce本身的方式加载或存储数据。有时,可以跳过在hdfs存储数据这项耗时的阶段,仅存储一些数据,不是全部的,或直接在MapReduce结束后输送给后面的处理。有时,基本的Hadoop规范,文件块和输入分割不能完成你需要的事情,所以要使用自定义的InputFormatOutputFormat

     

    本章三个模式处理输入:generating dataexternal source input,和partition pruniing。都有一个有趣的属性:map阶段完全不会意识到拿到输入键值对之前会发生复杂的事情。使用自定义的输入格式抽象出你要加载数据的方法的细节,是一种有效的方式。

     

    另一方面,hadoop不总是按你需要的方式存储数据。本章的模式external source output,会把数据写到hadoop以外的系统。自定义的输出格式也会不让mapreduce阶段意识到数据输出时发生的复杂的事情。

    Customizing Input and Output in Hadoop

    Hadoop允许你修改数据load的方式,有两个主要途径:配置输入有多少连续的块,配置记录如何出现在map阶段。相关的两个类是RecordReaderInputFormat。他们随着Hadoop框架运行,跟mapperreducer运行方式相似。也允许修改数据存储的方式,通过OutputFormatRecordWriter

    InputFormat

    Hadoop依赖job的输入格式做三件事:

    1.校验job的输入配置,例如数据是否存在。
    2.分割文件块为逻辑上的inputSplit类型的块,每一个对应一个map任务。

    3.创建RecordReader的实现从inputsplit创建键值对。这些键值对一个一个发送到mapper

     

    最常用的输入格式的子类是FileInputFormathadoop默认是TextInputFormat。这个类首先校验job的输入,保证输入路径的存在。然后根据文件字节数大小逻辑分割输入文件,使用块大小作为分割边界值。例如,160M的文件,块大小64M时分成三个逻辑块,0M-64M64m-128M,128M-160M.每个map任务都会对应其中一个块,然后RecordReader负责生成键值对。

     

    通常,recordReader有额外的修复边界问题的责任,因为输入分割边界是任意的,很有可能不是记录的边界。例如,TextInputFormat使用LineRecordReader读取文本文件对每个map的每一文本行创建键值对,例如用换行符分割。键是读到的一行的字节数,值是整行字符串。因为它不像输入分片的字节块,会用换行符分开,LineRecordRead会负责读到行的末尾保证读到一条完整的记录。不同数据块的这点数据(一个完整行)理论上可能不在相同的节点,所以从所在主机上读。这个读由FSDataInputStream类处理,我们就不必处理去哪儿找数据块。

     

    使用自己的格式时不要害怕经过了分割的边界,只需要检测没有重复或丢失数据。

     

    Notice:自定义的输入格式不限于文件输入。你可以把输入表示为InputSplit对象和键值对,自定义或其它的,可以在一个MapReduce job里并行读入任何东西到map阶段。只需要记住输入分片表示什么和利用数据本地性的优势。

     

    InputFormat抽象类有两个抽象方法:

    getSplits

    典型的实现是利用JobConText对象获取配置的输入并返回该对象的listInputsplit有个方法返回表示数据在集群中位置的机器的数组,提示TaskTracker应该执行的map任务。这个方法也是验证配置的正确性或抛出需要的异常的合适的地方,因为方法使用在前面。(例如在提交到jobTracker之前)

     

    CreateRecordReader

    这个方法在后面使用,用来生成RecordReader的实现,随后详细讨论。通常,一个新实例创建并立即返回,因为record reader有个初始化方法被框架调用。

     

    RecordReader

    RecordReader是用来根据给的InputSplit创建键值对的。因为inputsplit表示了分片的字节范围,使mapper的处理有意义。这就是为什么hadoophe MapReduce被认为是“读时模式”。模式是在RecordReader中定义的,单独的基于RecordReader的实现,而不是基于我们希望的job的输入。从输入源读取字节转换成writablecomparable key和一个writable value。创建自定义输入格式时经常使用自定义的类型,因为这是一种好的面向对象编程的方式来把信息给mapper

     

    RecordReader使用数据和由inputsplit创建的边界生成键值对。在基于文件的输入的环境中,“start“是文件中的RecordReader应该开始生成键值对的字节偏移量。“end”是应该停止读记录的偏移量。就api而言,没有硬性的边界:不能阻止一个开发人员把整个文件作为一个map的输入,当然这是不建议的,经常需要越过边界读数据,来保证读到一条完整的记录。

     

    考虑xml的问题。当使用TextInputFormat抽取每行时,xml元素通常不在同一行,会被MapReduce input 分割。当读到输入分区边界的“end“之后,就得到一条完整记录。找到记录的末尾以后,你仅需要保证每条记录的读从xml元素的开始开始。找到inputsplit的开始之后,继续读直到开始的xml标签被读到。这允许MapReduce框架覆盖整个xml文件的内容,但不会重复任何xml记录。由于向前找xml元素的开始而跳过的xml内容会被前面的map任务处理。

     

    recordReader 抽象类有几个方法要覆盖。

    Initialize

    map任务指定的inputSplitTaskAttemptContext作为本方法的参数。对基于文件的输入格式,这是寻找开始读文件时的字节偏移的好时机。

    GetCurrentKey and getCurrentValue

    这两个方法被框架使用生成键值对发送给mapper。尽可能重用这两个方法返回的对象

    nextKeyValue

    类似inputFormat类里的对应方法,读一个简单的键值对并返回true,直到数据读完。

    GetProgress

    这是个可选的方法,用于框架对度量的收集。

    Close

    由框架使用,在没有键值对要处理时清除资源。

     

    outputFormat

    hadoop依靠job的输出格式做两个主要的任务:

    1.检验job的输出配置。

    2.创建RecordWriter的实现写job的输出。

     

    FileInputFormat相对应的,FileOutputFormat处理基于文件的输出。因为MapReduce job的大多数输出写到hdfs,很多基于文件的输出格式相应的api都能解决大部分的需求。Hadoop默认使用TextOutputFormat,存储tab分隔的键值对到配置的hdfs的输出目录。TextOutputFormat也检验开始运行job之前输出目录不能存在。

     

    TextoutputFormat 使用LineRecordWriter对每个mapreduce任务写键值对,根据是否是reduce阶段。这个类使用toString方法序列化每一键值对到存储在hdfspart文件里,用tab分隔键值。这个分隔符是默认的,能通过配置改变。

     

    inputFormat类似,数据不会受限于只存在hdfs上。只要能用java把键值对写到其它源,例如jdbc,就可以用MapReduce做批量写。只需要保证你要写到的地方能处理多个任务产生的连接。

    outputFormat抽象类有三个抽象方法需要实现:

     

    checkOutputSpecs

    用于验证job指定的输出,例如保证job提交之前输出目录不能存在。否则,输出可能覆盖(看具体配置)。

    GetRecordWriter

    方法返回RecordWriter的实现,来序列化键值对到输出,输出可以是FileSystem对象。

    GetOutputCommiter

    Job的输出提交者在初始化期间设置每个任务,根据成功完成的状态提交(commit,区别于submit)任务,成功或其它状态,完成时都会清除任务。对基于文件的输出,FileOutputCommittter可以处理所有繁重的工作。它会对每个map任务创建临时输出目录,把成功的任务的输出移动到最终的输出目录。

     

    RecordWriter

    RecordWriter抽象类把键值对写到文件系统或另外的输出。与RecordReader不同,它没有初始化阶段。然而,可用构造器在需要的时候设置record writer。构造期间任何参数都能传入,因为record writer实例的创建是通过OutputFormat.getRecordWriter

     

    此类包含两个方法:

    Write

    这个方法由框架对每个要写的键值对调用。这个方法的实现很大程度上取决于你的使用。下面的例子中,我们展示怎样把键值对写到外部的内存键值存储,而不是文件系统。

    Close

    当处理完键值对时,框架调用这个方法。可以释放文件句柄,关闭跟其它服务的连接,或清除需要清除的任务。

    Generating Data

    Pattern Description

    生成数据模式很有趣,因为不是从外面加载数据,它快速,并行地产生数据。

    Intent

    你需要从零开始生成大量数据。

    Motivation

    这个模式最大的特点是,它不加载数据。用这种模式,你可以生成数据并存到分布式文件系统上。

    生成数据不太常见。通常,你可以生成一批数据并反复使用。当需要生成数据时,MapReduce是一个很适合的工具。

    这种模式最常见的使用案例是生成随机数据。构建一些具有代表性的数据集对大规模的测试非常有用,尽管这种测试针对的实际数据量很小。它对创建在一定范围内用来研究理论的证明的“toy domains”也很有用。

     

    生成随机数据常用来作为基准测试的一部分,例如常用的TeraGen/TeraSort  DFSIO

    不幸的是,这种模式的实现用hadoop不是很简单,因为底层的框架对一个map任务指定一个输入分片并对记录指定一个map方法。在这种模式下,没有输入分片,也没有记录的概念,所以必须欺骗框架认为是有分片,有记录。

     

    Structure

    hadoop实现这种模式,要实现一个自定义的inputFormat并让一个RecordReader生成随机数据。Map方法对数据源的处理完全忽略,所以构建非常迅速,而不是加载hdfs上的数据文件,多数情况下,可使用identity mapper,但如果想在map任务做一些后处理,或者立即分析它。见图7-1.

    这种模式是map-only的。

    ·InputFormat创建虚拟分片,分片数量可以配置。

    ·RecordReader拿假分片生成随机记录数据。有些时候,在输入分片里可以指定一些信息来告诉record reader生成什么。例如,要生成随机的日期/时间数据,一个输入分片代表一个小时段。

     

    ·多数情况下使用identitymapper原样写出输入数据。

    Figure 7-1. The structure of the generating data pattern

    Notice:实现这种模式比较懒的方式是把job的每个假输入文件塞进单条编造的记录。然后,就可以使用通用InputFormat RecordReadermap方法里生成数据。

    Consequences

    每个mapper输出一个包含随机数据的文件。

    Resemblances

    sqlpig有几种创建随机数据的方式,但都不够简洁和有说服力。

    Performance analysis

    这里主要考虑的性能方面的问题是需要多少map任务来生成数据。通常,map任务越多,生成数据量越快,因为充分利用了集群的并行性。然而,map启动数多于map slots数时意义不大,他们一直在做同样的事情。

    Generating Data Examples

    Generating random StackOverflow comments

    为了生成随机stackOverflow数据,我们使用1000个单词的list生成随机短评。我们需要生成一个随机分数,row iduserid,和创建时间。

     

    Driver code。解析四个命令行参数配置job。设置自定义的输入格式,然后进一步调用静态方法配置。所有输出写到给定的输出目录。使用identity mapper,设置reduce数量为0从而禁用reduce阶段。

     

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int numMapTasks = Integer.parseInt(args[0]);
        int numRecordsPerTask = Integer.parseInt(args[1]);
        Path wordList = new Path(args[2]);
        Path outputDir = new Path(args[3]);
        Job job = new Job(conf, "RandomDataGenerationDriver");
        job.setJarByClass(RandomDataGenerationDriver.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(RandomStackOverflowInputFormat.class);
        RandomStackOverflowInputFormat.setNumMapTasks(job, numMapTasks);
        RandomStackOverflowInputFormat.setNumRecordPerTask(job,
               numRecordsPerTask);
        RandomStackOverflowInputFormat.setRandomWordList(job, wordList);
        TextOutputFormat.setOutputPath(job, outputDir);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        System.exit(job.waitForCompletion(true) ? 0 : 2);
    }
     



    InputSplit code.FakeInputSplit类简单的继承自InputSplit并实现writable.没有覆盖任何方法。用于欺骗框架指派一个任务生成随机数据。

     

    public static class FakeInputSplit extends InputSplit implements Writable {
        publicvoid readFields(DataInput arg0) throws IOException {
        }
        publicvoid write(DataOutput arg0) throws IOException {
        }
        publiclong getLength() throws IOException, InterruptedException {
           return 0;
        }
        public String[] getLocations() throws IOException, InterruptedException {
           returnnew String[0];
        }
    }

    inputFormat code。输入格式有两个目的:返回框架生成map任务需要的输入分片,然后为map任务创建RandomStackOverflowRecordReader。覆盖getSplits方法返回一个配置的数量的FakeInputSplit分片数。这个数量是从配置取的。当框架调用createRecordReader,一个

    RandomStackOverflowRecordReader实例化,初始化,返回。

     

    public static class RandomStackOverflowInputFormat extends
           InputFormat<Text, NullWritable> {
        public static final String NUM_MAP_TASKS = "random.generator.map.tasks";
        public static final String NUM_RECORDS_PER_TASK = "random.generator.num.records.per.map.task";
        public static final String RANDOM_WORD_LIST = "random.generator.random.word.file";
        public List<InputSplit> getSplits(JobContext job) throws IOException {
           // Get the number of map tasks configured for
           int numSplits = job.getConfiguration().getInt(NUM_MAP_TASKS, -1);
           // Create a number of input splits equivalent to the number of tasks
           ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
           for (int i = 0; i < numSplits; ++i) {
               splits.add(new FakeInputSplit());
           }
           return splits;
        }
        public RecordReader<Text, NullWritable> createRecordReader(
               InputSplit split, TaskAttemptContext context)
               throws IOException, InterruptedException {
           // Create a new RandomStackOverflowRecordReader and initialize it
           RandomStackOverflowRecordReader rr = new RandomStackOverflowRecordReader();
           rr.initialize(split, context);
           return rr;
        }
        public static void setNumMapTasks(Job job, int i) {
           job.getConfiguration().setInt(NUM_MAP_TASKS, i);
        }
        public static void setNumRecordPerTask(Job job, int i) {
           job.getConfiguration().setInt(NUM_RECORDS_PER_TASK, i);
        }
        public static void setRandomWordList(Job job, Path file) {
           DistributedCache.addCacheFile(file.toUri(), job.getConfiguration());
        }
    }
     



    recordReader codeRecord reader是数据真正生成的地方。在初始化FakeInputSplit时给出,但简单的忽视它。从job配置中抽取要创建的记录数,从分布式缓存获得随机单词的list。对每次nextKeyValue调用,使用简单的随机数生成器创建一条随机记录。评论体由一个帮助方法从list随机选择单词来生成。单词数量从130个。计数器也自增,为了跟踪生成了多少条记录。一旦所有的记录生成完成,record reader返回false,给框架mapper已经没有输入信息的信号。

     

    public static class RandomStackOverflowRecordReader extends
           RecordReader<Text, NullWritable> {
        private int numRecordsToCreate = 0;
        private int createdRecords = 0;
        private Text key = new Text();
        private NullWritable value = NullWritable.get();
        private Random rndm = new Random();
        private ArrayList<String> randomWords = new ArrayList<String>();
        // This object will format the creation date string into a Date
        // object
        private SimpleDateFormat frmt = new SimpleDateFormat(
               "yyyy-MM-dd'T'HH:mm:ss.SSS");
        public void initialize(InputSplit split, TaskAttemptContext context)
               throws IOException, InterruptedException {
           // Get the number of records to create from the configuration
           this.numRecordsToCreate = context.getConfiguration().getInt(
                  NUM_RECORDS_PER_TASK, -1);
           // Get the list of random words from the DistributedCache
           URI[] files = DistributedCache.getCacheFiles(context
                  .getConfiguration());
           // Read the list of random words into a list
           BufferedReader rdr = new BufferedReader(new FileReader(
                  files[0].toString()));
           String line;
           while ((line = rdr.readLine()) != null) {
               randomWords.add(line);
           }
           rdr.close();
        }
        public boolean nextKeyValue() throws IOException, InterruptedException {
           // If we still have records to create
           if (createdRecords < numRecordsToCreate) {
               // Generate random data
               int score = Math.abs(rndm.nextInt()) % 15000;
               int rowId = Math.abs(rndm.nextInt()) % 1000000000;
               int postId = Math.abs(rndm.nextInt()) % 100000000;
               int userId = Math.abs(rndm.nextInt()) % 1000000;
               String creationDate = frmt.format(Math.abs(rndm.nextLong()));
               // Create a string of text from the random words
               String text = getRandomText();
               String randomRecord = "<row Id="" + rowId + "" PostId=""
                      + postId + "" Score="" + score + "" Text="" + text
                      + "" CreationDate="" + creationDate + "" UserId"="
                      + userId + "" />";
               key.set(randomRecord);
               ++createdRecords;
               return true;
           } else {
               // We are done creating records
               return false;
           }
        }
        private String getRandomText() {
           StringBuilder bldr = new StringBuilder();
           int numWords = Math.abs(rndm.nextInt()) % 30 + 1;
           for (int i = 0; i < numWords; ++i) {
               bldr.append(randomWords.get(Math.abs(rndm.nextInt())
                      % randomWords.size())
                      + " ");
           }
           return bldr.toString();
        }
        public Text getCurrentKey() throws IOException, InterruptedException {
           return key;
        }
        public NullWritable getCurrentValue() throws IOException,
               InterruptedException {
           return value;
        }
        public float getProgress() throws IOException, InterruptedException {
           return (float) createdRecords / (float) numRecordsToCreate;
        }
        public void close() throws IOException {
           // nothing to do here...
        }
    }


     

    External Source Output

    Pattern Description

    正如本章早些时候说的,外部源输出模式写到hadoop系统之外。

    Intent

    你想把MapReduce的输出写到远程地点。

    Motivation

    使用这种模式,我们能够把从MapReduce框架输出的数据直接给一个外部源。这对直接加载数据到系统而不是通过中转数据的方式极其有用。这种模式跳过存储到文件系统这一步而直接发送到要去的地方。使用MapReduce大量并行导入外部源的方式有使用的地方。

     

    数据是并行写出的。由于使用外部源做输出,你需要保证目标系统能够处理这种并行度,承受所有打开的连接。

    Structure

    7-2展示了外部源输出结构,解释如下:

    ·OutputFormatjob提交前验证job配置指定的输出。这也是保证外部源完全可用的好时机。如果当把数据提交给外部系统时发现它并不能用,这是不好的。这个方法也负责RecordWriter方法的创建。

    ·RecordWriter把所有的键值对写到外部源。它的实现根据不同的外部源而不同。对象构建时,使用外部源的api建立需要的连接。这些连接用于mapreduce任务写数据。

    Figure 7-2. The structure of the external source output pattern

    Consequences

    输出数据已经发送到外部源,并且外部源成功加载数据。

     

    Notice:注意任务失败可能发生,如果发生了,任何write方法里的键值对都不能恢复。典型的MapReduce job里,临时输出写到文件系统里。在失败的情况下,输出被丢弃。当写到一个外部目录时,会在流中接收数据。如果任务失败,外部源不会自动识别并丢弃所有从这个任务接收的数据。如果这是不可接受的,考虑使用自定义的OutputCommitter写临时输出到文件系统。

    Performance analysis

    MapReduce角度看,没什么可担心的,因为mapreduce都是通用的。不过要注意数据的接收方要能处理平行连接。运行1000个任务写到一个关系数据库里不是很好。要避免这种情况,你可能要让每个reducer处理多一点的数据。如果目标系统对并行支持的很好,这也可能不是问题。例如,写到一个分区数据库,可以把每个reducer写到指定的数据库实例。(oracle RAC?)

     

    External Source Output Example

    Writing to Redis instances

    这个例子是从MapReduce并行写入多个redis实例的基本方式。Redis是一个开源内存键值存储数据库。通常作为数据结构服务器,键可以是stringhashlistset,和sorted setRedis用标准c写的,能在多数posix系统下工作,例如linux,不需要外部的依赖。

     

    为了跟hadoop框架一起工作,jedis用于跟redis的交流。Jedis是开源的“blazingly small and sane Redis java 客户端”。还有其它语言编写的redis客户端可以在网上找到。

     

    这个例子没有实际的分析业务,本章剩下的都是这样。关注于用自定义的fileOutputFormat怎样把数据存储到hdfs并存到外部数据系统。这个例子里,stackOverflow用户数据写到数量可配置的redis集群,数据是用户到声誉值的映射。这些映射数据根据redishash平均的随机分发。

     

    Redishash是一种stirng 字段到string值的映射,跟javahashmap类似。每个hash都有一个key标识它。每个hash可保存超过40亿的键值对。

     

    问题:给出用户信息数据,并行随机分发用户-声誉值的映射数据到一个数量可配置的redis集群。

     

    outputFormat codeRedisHashOutputFormat类负责在提交到jobtracker之前创建和验证job配置。也会创建RecordWriter序列化输出键值对。通常写到hdfs,但我们这里不是,一会会看到。

     

    输出格式包含了必须被驱动代码设置的配置变量,来保证已经有了job运行需要的所有信息。

    这里,有几个推测开发人员需要用到的静态方法。这个输出格式接受一些redis实例主机作为csv结构和一个写所有输出的redis hash keycheckOutputSpecs 方法里,运行之前要保证两个参数已被设置,因为没有他们job会失败。这也是你要验证配置的地方。

     

    getRecordWriter方法用于后面为mapreduce任务创建RecordWriter实例。这里我们靠RedisHashRecordWriter得到需要的配置变量并返回一个新的实例。这个Record writerRedisHashOutputFormat的子类,不需要但是约定的东西。

     

    这个输出格式的最后一个方法是getOutputCommitter。在任务失败需要重跑之前框架用它管理任何临时输出。对于这个实现,我们通常不关心任务是否失败和需要重新执行。只要job完成就可以。输出提交者是框架需要的,但NullOutputFormat包含的输出提交者的实现什么也不做。

     

    public static class RedisHashOutputFormat extends OutputFormat<Text, Text> {
        public static final String REDIS_HOSTS_CONF = "mapred.redishashoutputformat.hosts";
        public static final String REDIS_HASH_KEY_CONF = "mapred.redishashinputformat.key";
        public static void setRedisHosts(Job job, String hosts) {
           job.getConfiguration().set(REDIS_HOSTS_CONF, hosts);
        }
        public static void setRedisHashKey(Job job, String hashKey) {
           job.getConfiguration().set(REDIS_HASH_KEY_CONF, hashKey);
        }
        public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
               throws IOException, InterruptedException {
           return new RedisHashRecordWriter(job.getConfiguration().get(
                  REDIS_HASH_KEY_CONF), job.getConfiguration().get(
                  REDIS_HOSTS_CONF));
        }
        public void checkOutputSpecs(JobContext job) throws IOException {
           String hosts = job.getConfiguration().get(REDIS_HOSTS_CONF);
           if (hosts == null || hosts.isEmpty()) {
               throw new IOException(REDIS_HOSTS_CONF
                      + " is not set in configuration.");
           }
           String hashKey = job.getConfiguration().get(REDIS_HASH_KEY_CONF);
           if (hashKey == null || hashKey.isEmpty()) {
               throw new IOException(REDIS_HASH_KEY_CONF
                      + " is not set in configuration.");
           }
        }
        public OutputCommitter getOutputCommitter(TaskAttemptContext context)
               throws IOException, InterruptedException {
           return (new NullOutputFormat<Text, Text>())
                  .getOutputCommitter(context);
        }
        public static class RedisHashRecordWriter extends
               RecordWriter<Text, Text> {
           // code in next section
        }
    }


    RecordReader code RedisHashRecordWriter类通过jedis客户端处理redis的连接并写数据。每个键值对随机写到redis实例,这种写是在整个集群内平均分发的,构造器保存要写的hash key并创建新的jedis实例。

    然后连接上jedis实例并跟一个整数做映射。Write方法会用这个映射得到指定的jedis实例。用keyhash码对redis实例个数取模。这个模值决定了键值对要发送的jedis实例。Jedis实例在close方法里关闭连接。

     

    public static class RedisHashRecordWriter extends RecordWriter<Text, Text> {
        private HashMap<Integer, Jedis> jedisMap = new HashMap<Integer, Jedis>();
        private String hashKey = null;
        public RedisHashRecordWriter(String hashKey, String hosts) {
           this.hashKey = hashKey;
           // Create a connection to Redis for each host
           // Map an integer 0-(numRedisInstances - 1) to the instance
           int i = 0;
           for (String host : hosts.split(",")) {
               Jedis jedis = new Jedis(host);
               jedis.connect();
               jedisMap.put(i, jedis);
               ++i;
           }
        }
        public void write(Text key, Text value) throws IOException,
               InterruptedException {
           // Get the Jedis instance that this key/value pair will be
           // written to
           Jedis j = jedisMap.get(Math.abs(key.hashCode()) % jedisMap.size());
           // Write the key/value pair
           j.hset(hashKey, key.toString(), value.toString());
        }
        public void close(TaskAttemptContext context) throws IOException,
               InterruptedException {
           // For each jedis instance, disconnect it
           for (Jedis jedis : jedisMap.values()) {
               jedis.disconnect();
           }
        }
    }


    Mapper code。较简单。Userid和声誉值从记录获取然后输出。Outputformat会做大部分的工作,允许mapper重用多次去写任何你想要的东西到redis hash里。

     

    public static class RedisOutputMapper extends
           Mapper<Object, Text, Text, Text> {
        private Text outkey = new Text();
        private Text outvalue = new Text();
     
        public void map(Object key, Text value, Context context)
               throws IOException, InterruptedException {
           Map<String, String> parsed = MRDPUtils.transformXmlToMap(value
                  .toString());
           String userId = parsed.get("Id");
           String reputation = parsed.get("Reputation");
           // Set our output key and values
           outkey.set(userId);
           outvalue.set(reputation);
           context.write(outkey, outvalue);
        }
    }


    Driver code。驱动代码解析命令行参数,调用静态方法设置要写到redis的数据。

     

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Path inputPath = new Path(args[0]);
        String hosts = args[1];
        String hashName = args[2];
        Job job = new Job(conf, "Redis Output");
        job.setJarByClass(RedisOutputDriver.class);
        job.setMapperClass(RedisOutputMapper.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.setInputPaths(job, inputPath);
        job.setOutputFormatClass(RedisHashOutputFormat.class);
        RedisHashOutputFormat.setRedisHosts(job, hosts);
        RedisHashOutputFormat.setRedisHashKey(job, hashName);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        int code = job.waitForCompletion(true) ? 0 : 2;
        System.exit(code);
    }
    


  • 相关阅读:
    Java Web系统经常使用的第三方接口
    Direct UI
    Python 分析Twitter用户喜爱的推文
    数据挖掘十大经典算法(9) 朴素贝叶斯分类器 Naive Bayes
    利用Excel批量高速发送电子邮件
    普林斯顿大学数学系的崛起
    Node.js学习
    映射 SQL 和 Java 类型
    Nutch配置
    OGNL
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276250.html
Copyright © 2011-2022 走看看