zoukankan      html  css  js  c++  java
  • 数据压缩、数据倾斜join操作

    1、数据压缩发生阶段

    操作 压缩
    数据源 》数据传输 数据压缩
    mapper map端输出压缩
    》数据传输 数据压缩
    reducer reduce端输出压缩
    》数据传输 数据压缩
    结果数据

    设置map端输出压缩:
    1)开启压缩
    conf.setBoolean

    //开启map端输出压缩
    conf.setBoolean("mapreduce.map.output.compress",true);

    2)设置具体压缩编码
    conf.setClass

    //设置压缩方式
    //conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
    conf.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);

    设置reduce端输出压缩:
    1)设置reduce输出压缩
    FileOutputFormat.setCompressOutput

    //设置reduce端输出压缩
    FileOutputFormat.setCompressOutput(job,true);

    2)设置具体压缩编码
    FileOutputFormat.setOutputCompressorClass

    //设置压缩方式
    //FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
    //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    FileOutputFormat.setOutputCompressorClass(job,DefaultCodec.class);

    hive数据仓库:mapreduce 用hsql处理大数据

    2、压缩编码使用场景

    1-> Gzip压缩方式

    压缩率比较高,并且压缩解压缩速度很快
    hadoop自身支持的压缩方式,用gzip格式处理数据就像直接处理文本数据是完全一样
    的;
    在linux系统自带gzip命令,使用很方便简洁
    不支持split
    使用每个文件压缩之后大小需要在128M以下(块大小)
    200M-》设置块大小

    2->LZO压缩方式

    压缩解压速度比较快并且,压缩率比较合理
    支持split
    在linux系统不可以直接使用,但是可以进行安装
    压缩率比gzip和bzip2要弱,hadoop本身不支持
    需要安装

    3->Bzip2压缩方式

    支持压缩,具有很强的压缩率。hadoop本身支持
    linux中可以安装
    压缩解压缩速度很慢

    4->Snappy压缩方式

    压缩解压缩速度很快,而且有合理的压缩率
    不支持split

    3、数据倾斜

    reduce join

    4、Hadoop中有哪些组件

    HDFS:数据的分布式存储
    MapReduce:数据的分布式计算
    Yarn:资源调度(cpu/内存…)
    Yarn节点:resourceManager
    nodeManager

    5、进行两个表的拼接

    DistributedCacheMapper类

    package com.hsiehchou.mapjoin;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    /**
    * mapjoin
    * 完成两张表数据的关联操作
    */
    public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    HashMap<String, String> pdMap = new HashMap<String, String>();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
    //1.加载缓存文件
    URI[] cacheFiles = context.getCacheFiles();
    BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(cacheFiles[0].getPath()), "UTF-8"));
    //这里可以将文件放在当前项目文件下,如果不放就用上面的那两句
    //BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "UTF-8"));
    String line;
    //2.判断缓存文件不为空
    while(StringUtils.isNotEmpty(line = br.readLine())){
    //切割数据
    String[] fields = line.split(" ");
    //缓冲 到 集合; 商品ID 商品名
    pdMap.put(fields[0],fields[1]);
    }
    br.close();
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //1.获取数据
    String line = value.toString();
    //2.切分数据
    String[] fields = line.split(" ");
    //3.获取商品的pid,商品名称
    String pid = fields[1];
    String pName = pdMap.get(pid);
    //4.拼接
    line = line + " " + pName;
    //5.输出
    context.write(new Text(line),NullWritable.get());
    }
    }

    DistributedCacheDriver类

    package com.hsiehchou.mapjoin;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    public class DistributedCacheDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
    //创建job任务
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    //指定jar包位置
    job.setJarByClass(DistributedCacheDriver.class);
    //关联使用的Mapper
    job.setMapperClass(DistributedCacheMapper.class);
    //设置最终的输出的数据类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    //设置数据输入的路径
    FileInputFormat.setInputPaths(job,new Path("e://test//table//in"));
    //设置数据输出的路径
    FileOutputFormat.setOutputPath(job,new Path("e://test//table//out"));
    //加载缓存数据
    job.addCacheFile(new URI("file:///e:/test/inputcache/pd.txt"));
    //注意:没有跑reducer 需要指定reduceTask为0
    job.setNumReduceTasks(0);
    //提交任务
    boolean rs = job.waitForCompletion(true);
    System.exit(rs? 0:1);
    }
    }

    本地模式测试

    URI[] cacheFiles = context.getCacheFiles();
    BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(cacheFiles[0].getPath()), "UTF-8"));

    集群模式时

    conf.set("mapreduce.framework.name", "yarn");yarn模式
    job.addCacheFile(new URI("hdfs:///test2/pd.txt"));//添加hdfs文件做缓存
  • 相关阅读:
    STM32L476的RTC使用问题记录
    python数据分析之:时间序列二
    python+NLTK 自然语言学习处理七:N-gram标注
    python数据分析之:时间序列一
    如何在ubuntun中安装intellij idea 2018并破解
    python+NLTK 自然语言学习处理六:分类和标注词汇一
    python数据分析之:数据聚合与分组运算
    500 Lines or Less: A Template Engine(模板引擎)
    python+NLTK 自然语言学习处理五:词典资源
    Django之博客系统:在网站中分享内容(一)
  • 原文地址:https://www.cnblogs.com/hsiehchou/p/10408064.html
Copyright © 2011-2022 走看看