zoukankan      html  css  js  c++  java
  • 04_离线计算系统_第4天(mapreduce加强)

    课程大纲(MAPREDUCE详解)

    MapReduce快速入门

    如何理解map、reduce计算模型

    Mapreudce程序运行演示

    Mapreduce编程规范及示例编写

    Mapreduce程序运行模式及debug方法

    MapReduce高级特性

    Mapreduce程序的核心机制

    MapReduce的序列化框架

    MapReduce的排序实现

    MapReduce的分区机制及自定义

    Mapreduce的数据压缩

    Mapreduce与yarn的结合

    Mapreduce编程案例

     

    Mapreduce 参数优化

     

     

    目标:

    掌握mapreduce分布式运算框架的编程思想

    掌握mapreduce常用算法的编程套路

    掌握mapreduce分布式运算框架的运行机制,具备一定自定义开发的能力

     

     

     

     

     

     

     

     

    流量统计相关需求

    1、对流量日志中的用户统计总上、下行流量

    技术点: 自定义javaBean用来在mapreduce中充当value

    注意: javaBean要实现Writable接口,实现两个方法

    //序列化,将对象的字段信息写入输出流

    @Override

    public void write(DataOutput out) throws IOException {

     

    out.writeLong(upflow);

    out.writeLong(downflow);

    out.writeLong(sumflow);

     

    }

     

    //反序列化,从输入流中读取各个字段信息

    @Override

    public void readFields(DataInput in) throws IOException {

    upflow = in.readLong();

    downflow = in.readLong();

    sumflow = in.readLong();

     

    }

     

     

    2、统计流量且按照流量大小倒序排序

    技术点:这种需求,用一个mapreduce -job 不好实现,需要两个mapreduce -job

    第一个job负责流量统计,跟上题相同

    第二个job读入第一个job的输出,然后做排序

    要将flowBean作为map的key输出,这样mapreduce就会自动排序

         此时,flowBean要实现接口WritableComparable

         要实现其中的compareTo()方法,方法中,我们可以定义倒序比较的逻辑

     

     

    3、统计流量且按照手机号的归属地,将结果数据输出到不同的省份文件中

    技术点:自定义Partitioner

    @Override

    public int getPartition(Text key, FlowBean value, int numPartitions) {

     

    String prefix = key.toString().substring(0,3);

    Integer partNum = pmap.get(prefix);

     

    return (partNum==null?4:partNum);

    }

     

    自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task

    job.setNumReduceTasks(5);

     

    注意:如果reduceTask的数量>= getPartition的结果数  ,则会多产生几个空的输出文件part-r-000xx

    如果     1<reduceTask的数量<getPartition的结果数 ,则有一部分分区数据无处安放,会Exception!!!

    如果 reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000

     

     

    社交粉丝数据分析

    以下是qq的好友列表数据,冒号前是一个用,冒号后是该用户的所有好友(数据中的好友关系是单向的)

    A:B,C,D,F,E,O

    B:A,C,E,K

    C:F,A,D,I

    D:A,E,F,L

    E:B,C,D,M,L

    F:A,B,C,D,E,O,M

    G:A,C,D,E,F

    H:A,C,D,E,O

    I:A,O

    J:B,O

    K:A,C,D

    L:D,E,F

    M:E,F,G

    O:A,H,I,J

     

    求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

    解题思路:

    第一步  

    map

    读一行   A:B,C,D,F,E,O

    输出    <B,A><C,A><D,A><F,A><E,A><O,A>

    在读一行   B:A,C,E,K

    输出   <A,B><C,B><E,B><K,B>

     

     

    REDUCE

    拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......

    输出:  

    <A-B,C>

    <A-E,C>

    <A-F,C>

    <A-G,C>

    <B-E,C>

    <B-F,C>.....

     

     

     

    第二步

    map

    读入一行<A-B,C>

    直接输出<A-B,C>

     

    reduce

    读入数据  <A-B,C><A-B,F><A-B,G>.......

    输出: A-B  C,F,G,.....

     

    扩展:求互粉的人!!!!

     

    倒排索引建立

    需求:有大量的文本(文档、网页),需要建立搜索索引

     

     

     

     

     

     

     

     

     

     

    1. 自定义inputFormat

    1.1 需求

    无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案

     

    1.2 分析

    小文件的优化无非以下几种方式:

    1、 在数据采集的时候,将小文件或小批数据合成大文件再上传HDFS

    2、 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并

    3、 在mapreduce处理时,可采用combineInputFormat提高效率

     

    1.3 实现

    本节实现的是上述第二种方式

    程序的核心机制:

    自定义一个InputFormat

    改写RecordReader,实现一次读取一个完整文件封装为KV

    在输出时使用SequenceFileOutPutFormat输出合并文件

     

    代码如下:

    自定义InputFromat

    public class WholeFileInputFormat extends

    FileInputFormat<NullWritable, BytesWritable> {

    //设置每个小文件不可分片,保证一个小文件生成一个key-value键值对

    @Override

    protected boolean isSplitable(JobContext context, Path file) {

    return false;

    }

     

    @Override

    public RecordReader<NullWritable, BytesWritable> createRecordReader(

    InputSplit split, TaskAttemptContext context) throws IOException,

    InterruptedException {

    WholeFileRecordReader reader = new WholeFileRecordReader();

    reader.initialize(split, context);

    return reader;

    }

    }

     

     

    自定义RecordReader

    class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {

    private FileSplit fileSplit;

    private Configuration conf;

    private BytesWritable value = new BytesWritable();

    private boolean processed = false;

     

    @Override

    public void initialize(InputSplit split, TaskAttemptContext context)

    throws IOException, InterruptedException {

    this.fileSplit = (FileSplit) split;

    this.conf = context.getConfiguration();

    }

     

    @Override

    public boolean nextKeyValue() throws IOException, InterruptedException {

    if (!processed) {

    byte[] contents = new byte[(int) fileSplit.getLength()];

    Path file = fileSplit.getPath();

    FileSystem fs = file.getFileSystem(conf);

    FSDataInputStream in = null;

    try {

    in = fs.open(file);

    IOUtils.readFully(in, contents, 0, contents.length);

    value.set(contents, 0, contents.length);

    } finally {

    IOUtils.closeStream(in);

    }

    processed = true;

    return true;

    }

    return false;

    }

     

    @Override

    public NullWritable getCurrentKey() throws IOException,

    InterruptedException {

    return NullWritable.get();

    }

     

    @Override

    public BytesWritable getCurrentValue() throws IOException,

    InterruptedException {

    return value;

    }

     

    @Override

    public float getProgress() throws IOException {

    return processed ? 1.0f : 0.0f;

    }

     

    @Override

    public void close() throws IOException {

    // do nothing

    }

    }

     

    定义mapreduce处理流程

    public class SmallFilesToSequenceFileConverter extends Configured implements

    Tool {

    static class SequenceFileMapper extends

    Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

    private Text filenameKey;

     

    @Override

    protected void setup(Context context) throws IOException,

    InterruptedException {

    InputSplit split = context.getInputSplit();

    Path path = ((FileSplit) split).getPath();

    filenameKey = new Text(path.toString());

    }

     

    @Override

    protected void map(NullWritable key, BytesWritable value,

    Context context) throws IOException, InterruptedException {

    context.write(filenameKey, value);

    }

    }

     

    @Override

    public int run(String[] args) throws Exception {

    Configuration conf = new Configuration();

    System.setProperty("HADOOP_USER_NAME", "hdfs");

    String[] otherArgs = new GenericOptionsParser(conf, args)

    .getRemainingArgs();

    if (otherArgs.length != 2) {

    System.err.println("Usage: combinefiles <in> <out>");

    System.exit(2);

    }

     

    Job job = Job.getInstance(conf,"combine small files to sequencefile");

    // job.setInputFormatClass(WholeFileInputFormat.class);

    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(BytesWritable.class);

    job.setMapperClass(SequenceFileMapper.class);

    return job.waitForCompletion(true) ? 0 : 1;

    }

     

    public static void main(String[] args) throws Exception {

    int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),

    args);

    System.exit(exitCode);

     

    }

    }

     

     

     

     

    2. 自定义outputFormat

    2.1 需求

    现有一些原始日志需要做增强解析处理,流程:

    1、 从原始日志文件中读取数据

    2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志

    3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录

     

     

    2.2 分析

    程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现

     

    2.3 实现

    实现要点:

    1、 在mapreduce中访问外部资源

    2、 自定义outputformat,改写其中的recordwriter改写具体输出数据的方法write()

     

    代码实现如下:

    数据库获取数据的工具

    public class DBLoader {

     

    public static void dbLoader(HashMap<String, String> ruleMap) {

    Connection conn = null;

    Statement st = null;

    ResultSet res = null;

     

    try {

    Class.forName("com.mysql.jdbc.Driver");

    conn = DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge", "root", "root");

    st = conn.createStatement();

    res = st.executeQuery("select url,content from urlcontent");

    while (res.next()) {

    ruleMap.put(res.getString(1), res.getString(2));

    }

    } catch (Exception e) {

    e.printStackTrace();

     

    } finally {

    try{

    if(res!=null){

    res.close();

    }

    if(st!=null){

    st.close();

    }

    if(conn!=null){

    conn.close();

    }

     

    }catch(Exception e){

    e.printStackTrace();

    }

    }

    }

     

     

    public static void main(String[] args) {

    DBLoader db = new DBLoader();

    HashMap<String, String> map = new HashMap<String,String>();

    db.dbLoader(map);

    System.out.println(map.size());

    }

    }

     

     

    自定义一个outputformat

    public class LogEnhancerOutputFormat extends FileOutputFormat<Text, NullWritable>{

     

     

    @Override

    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {

     

     

    FileSystem fs = FileSystem.get(context.getConfiguration());

    Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log");

    Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log");

     

    FSDataOutputStream enhanceOut = fs.create(enhancePath);

    FSDataOutputStream toCrawlOut = fs.create(toCrawlPath);

     

     

    return new MyRecordWriter(enhanceOut,toCrawlOut);

    }

     

     

     

    static class MyRecordWriter extends RecordWriter<Text, NullWritable>{

     

    FSDataOutputStream enhanceOut = null;

    FSDataOutputStream toCrawlOut = null;

     

    public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) {

    this.enhanceOut = enhanceOut;

    this.toCrawlOut = toCrawlOut;

    }

     

    @Override

    public void write(Text key, NullWritable value) throws IOException, InterruptedException {

     

    //有了数据,你来负责写到目的地  —— hdfs

    //判断,进来内容如果是带tocrawl的,就往待爬清单输出流中写 toCrawlOut

    if(key.toString().contains("tocrawl")){

    toCrawlOut.write(key.toString().getBytes());

    }else{

    enhanceOut.write(key.toString().getBytes());

    }

     

    }

     

    @Override

    public void close(TaskAttemptContext context) throws IOException, InterruptedException {

     

    if(toCrawlOut!=null){

    toCrawlOut.close();

    }

    if(enhanceOut!=null){

    enhanceOut.close();

    }

     

    }

     

     

    }

    }

     

    开发mapreduce处理流程

    /**

     * 这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行原始日志后面)

     *

     * @author

     *

     */

    public class LogEnhancer {

     

    static class LogEnhancerMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

     

    HashMap<String, String> knowledgeMap = new HashMap<String, String>();

     

    /**

     * maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中

     */

    @Override

    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {

     

    DBLoader.dbLoader(knowledgeMap);

     

    }

     

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

     

    String line = value.toString();

     

    String[] fields = StringUtils.split(line, " ");

     

    try {

    String url = fields[26];

     

    // 对这一行日志中的url去知识库中查找内容分析信息

    String content = knowledgeMap.get(url);

     

    // 根据内容信息匹配的结果,来构造两种输出结果

    String result = "";

    if (null == content) {

    // 输往待爬清单的内容

    result = url + " " + "tocrawl ";

    } else {

    // 输往增强日志的内容

    result = line + " " + content + " ";

    }

     

    context.write(new Text(result), NullWritable.get());

    } catch (Exception e) {

     

    }

    }

     

    }

     

    public static void main(String[] args) throws Exception {

     

    Configuration conf = new Configuration();

     

    Job job = Job.getInstance(conf);

     

    job.setJarByClass(LogEnhancer.class);

     

    job.setMapperClass(LogEnhancerMapper.class);

     

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(NullWritable.class);

     

    // 要将自定义的输出格式组件设置到job中

    job.setOutputFormatClass(LogEnhancerOutputFormat.class);

     

    FileInputFormat.setInputPaths(job, new Path(args[0]));

     

    // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat

    // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

     

    job.waitForCompletion(true);

    System.exit(0);

     

    }

     

    }

    3. 自定义GroupingComparator

    3.1 需求

    有如下订单数据

    订单id

    商品id

    成交金额

    Order_0000001

    Pdt_01

    222.8

    Order_0000001

    Pdt_05

    25.8

    Order_0000002

    Pdt_03

    522.8

    Order_0000002

    Pdt_04

    122.4

    Order_0000003

    Pdt_01

    222.8

     

    现在需要求出每一个订单中成交金额最大的一笔交易

     

    3.2 分析

    1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

    2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

     

     

    3.3 实现

    自定义groupingcomparator

    /**

     * 用于控制shuffle过程中reduce端对kv对的聚合逻辑

     * @author duanhaitao@itcast.cn

     *

     */

    public class ItemidGroupingComparator extends WritableComparator {

     

    protected ItemidGroupingComparator() {

     

    super(OrderBean.class, true);

    }

     

     

    @Override

    public int compare(WritableComparable a, WritableComparable b) {

    OrderBean abean = (OrderBean) a;

    OrderBean bbean = (OrderBean) b;

     

    //将item_id相同的bean都视为相同,从而聚合为一组

    return abean.getItemid().compareTo(bbean.getItemid());

    }

    }

     

     

    定义订单信息bean

    /**

     * 订单信息bean,实现hadoop的序列化机制

     * @author duanhaitao@itcast.cn

     *

     */

    public class OrderBean implements WritableComparable<OrderBean>{

    private Text itemid;

    private DoubleWritable amount;

     

    public OrderBean() {

    }

    public OrderBean(Text itemid, DoubleWritable amount) {

    set(itemid, amount);

    }

     

    public void set(Text itemid, DoubleWritable amount) {

     

    this.itemid = itemid;

    this.amount = amount;

     

    }

     

    public Text getItemid() {

    return itemid;

    }

     

    public DoubleWritable getAmount() {

    return amount;

    }

     

    @Override

    public int compareTo(OrderBean o) {

    int cmp = this.itemid.compareTo(o.getItemid());

    if (cmp == 0) {

     

    cmp = -this.amount.compareTo(o.getAmount());

    }

    return cmp;

    }

     

    @Override

    public void write(DataOutput out) throws IOException {

    out.writeUTF(itemid.toString());

    out.writeDouble(amount.get());

     

    }

     

    @Override

    public void readFields(DataInput in) throws IOException {

    String readUTF = in.readUTF();

    double readDouble = in.readDouble();

     

    this.itemid = new Text(readUTF);

    this.amount= new DoubleWritable(readDouble);

    }

     

     

    @Override

    public String toString() {

    return itemid.toString() + " " + amount.get();

    }

    }

     

    编写mapreduce处理流程

    /**

     * 利用secondarysort机制输出每种item订单金额最大的记录

     * @author duanhaitao@itcast.cn

     *

     */

    public class SecondarySort {

     

    static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{

     

    OrderBean bean = new OrderBean();

     

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

     

    String line = value.toString();

    String[] fields = StringUtils.split(line, " ");

     

    bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));

     

    context.write(bean, NullWritable.get());

     

    }

     

    }

     

    static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{

     

     

    //在设置了groupingcomparator以后,这里收到的kv数据 就是:  <1001 87.6>,null  <1001 76.5>,null  ....

    //此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:<1001 87.6>

    //要输出同一个item的所有订单中最大金额的那一个,就只要输出这个key

    @Override

    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

    context.write(key, NullWritable.get());

    }

    }

     

     

    public static void main(String[] args) throws Exception {

     

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf);

     

    job.setJarByClass(SecondarySort.class);

     

    job.setMapperClass(SecondarySortMapper.class);

    job.setReducerClass(SecondarySortReducer.class);

     

     

    job.setOutputKeyClass(OrderBean.class);

    job.setOutputValueClass(NullWritable.class);

     

    FileInputFormat.setInputPaths(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //指定shuffle所使用的GroupingComparator类

    job.setGroupingComparatorClass(ItemidGroupingComparator.class);

    //指定shuffle所使用的partitioner类

    job.setPartitionerClass(ItemIdPartitioner.class);

     

    job.setNumReduceTasks(3);

     

    job.waitForCompletion(true);

     

    }

     

    }

     

     

    4. Mapreduce中的DistributedCache应用

    4.1 Map端join案例

    4.1.1 需求

    实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志” join “产品信息”

     

     

    4.1.2 分析

    --原理阐述

    适用于关联表中有小表的情形;

    可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果

    可以大大提高join操作的并发度,加快处理速度

     

    --示例:先在mapper类中预先定义好小表,进行join

    --并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加载到小表的数据,进而在本地即可实现join

     

    4.1.3 实现

    public class TestDistributedCache {

    static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{

    FileReader in = null;

    BufferedReader reader = null;

    HashMap<String,String> b_tab = new HashMap<String, String>();

    String localpath =null;

    String uirpath = null;

     

    //是在map任务初始化的时候调用一次

    @Override

    protected void setup(Context context) throws IOException, InterruptedException {

    //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用

    Path[] files = context.getLocalCacheFiles();

    localpath = files[0].toString();

    URI[] cacheFiles = context.getCacheFiles();

     

     

    //缓存文件的用法——直接用本地IO来读取

    //这里读的数据是map task所在机器本地工作目录中的一个小文件

    in = new FileReader("b.txt");

    reader =new BufferedReader(in);

    String line =null;

    while(null!=(line=reader.readLine())){

     

    String[] fields = line.split(",");

    b_tab.put(fields[0],fields[1]);

     

    }

    IOUtils.closeStream(reader);

    IOUtils.closeStream(in);

     

    }

     

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

     

    //这里读的是这个map task所负责的那一个切片数据(在hdfs上)

     String[] fields = value.toString().split(" ");

     

     String a_itemid = fields[0];

     String a_amount = fields[1];

     

     String b_name = b_tab.get(a_itemid);

     

     // 输出结果  1001 98.9 banan

     context.write(new Text(a_itemid), new Text(a_amount + " " + ":" + localpath + " " +b_name ));

     

    }

    }

    public static void main(String[] args) throws Exception {

     

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf);

     

    job.setJarByClass(TestDistributedCache.class);

     

    job.setMapperClass(TestDistributedCacheMapper.class);

     

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(LongWritable.class);

     

    //这里是我们正常的需要处理的数据所在路径

    FileInputFormat.setInputPaths(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

     

    //不需要reducer

    job.setNumReduceTasks(0);

    //分发一个文件到task进程的工作目录

    job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));

     

    //分发一个归档文件到task进程的工作目录

    // job.addArchiveToClassPath(archive);

     

    //分发jar包到task节点的classpath下

    // job.addFileToClassPath(jarfile);

     

    job.waitForCompletion(true);

    }

    }

     

     

    5. Mapreduce的其他补充

    5.1 计数器应用

    在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce框架中提供的全局计数器来实现

    示例代码如下:

    public class MultiOutputs {

    //通过枚举形式定义自定义计数器

    enum MyCounter{MALFORORMED,NORMAL}

     

    static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

     

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

     

    String[] words = value.toString().split(",");

     

    for (String word : words) {

    context.write(new Text(word), new LongWritable(1));

    }

    //对枚举定义的自定义计数器加1

    context.getCounter(MyCounter.MALFORORMED).increment(1);

    //通过动态设置自定义计数器加1

    context.getCounter("counterGroupa", "countera").increment(1);

    }

     

    }

     

     

     

     

     

     

    5.2 多job串联

    一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现

     

    示例代码:

          ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());

            ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());

            ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());

           

            // 设置作业依赖关系

            cJob2.addDependingJob(cJob1);

            cJob3.addDependingJob(cJob2);

     

            JobControl jobControl = new JobControl("RecommendationJob");

            jobControl.addJob(cJob1);

            jobControl.addJob(cJob2);

            jobControl.addJob(cJob3);

     

            cJob1.setJob(job1);

            cJob2.setJob(job2);

            cJob3.setJob(job3);

     

            // 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束

            Thread jobControlThread = new Thread(jobControl);

            jobControlThread.start();

            while (!jobControl.allFinished()) {

                Thread.sleep(500);

            }

            jobControl.stop();

     

            return 0;

     

     

     

     

     

    5.3 Configuration对象高级应用

     

     

    6. mapreduce参数优化

    MapReduce重要配置参数

    11.1 资源相关参数

    (1) mapreduce.map.memory.mb: 一个Map Task可使用的资源上限(单位:MB),默认为1024如果Map Task实际使用的资源量超过该值,则会被强制杀死。

    (2) mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。

    (3) mapreduce.map.java.opts: Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.

    “-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “”

    (4) mapreduce.reduce.java.opts: Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.

    “-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默认值: “”

    (5) mapreduce.map.cpu.vcores: 每个Map task可使用的最多cpu core数目, 默认值: 1

    (6) mapreduce.map.cpu.vcores: 每个Reduce task可使用的最多cpu core数目, 默认值: 1

     

    11.2 容错相关参数

    (1) mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

    (2) mapreduce.reduce.maxattempts:个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

    (3) mapreduce.map.failures.maxpercent: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。

    (4) mapreduce.reduce.failures.maxpercent: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0.

    (5) mapreduce.task.timeout: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”

    11.3 本地运行mapreduce 作业

    设置以下几个参数:

    mapreduce.framework.name=local

    mapreduce.jobtracker.address=local

    fs.defaultFS=local

    11.4 效率和稳定性相关参数

    (1) mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false

    (2) mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false

    (3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。

    (4) mapreduce.input.fileinputformat.split.minsize: 每个Map Task处理的数据量(仅针对基于文件的Inputformat有效,比如TextInputFormat,SequenceFileInputFormat),默认为一个block大小,即 134217728。

     

    1、mapreduce框架的设计思想

    2、mapreduce框架中的程序实体角色:maptask reducetask mrappmaster

    3、mapreduce程序运行的整体流程

    4、mapreduce程序中maptask任务切片规划的机制(掌握整体逻辑流程,看day03_word文档中的“maptask并行度”)

    5、mapreduce程序提交的整体流程(看图:一坨 "客户端提交mr程序job的流程")

    6、编码:
    wordcount
    流量汇总统计(hadoop的序列化实现)
    流量汇总统计并按省份区分

  • 相关阅读:
    【大话设计模式】——浅谈设计模式基础
    mongodb入门安装
    Unity3D研究之多语言用中文做KEY
    java7新特性之Try-with-resources (TWR)
    ORACLE取周、月、季、年的開始时间和结束时间
    topas命令详解
    topas解析(AIX)
    top(topas),vmstat,iostat在linux和AIX操作系统下显示情况
    怎么检查网站的死链接呢?
    网站死链接检测查询工具
  • 原文地址:https://www.cnblogs.com/shan13936/p/13762962.html
Copyright © 2011-2022 走看看