zoukankan      html  css  js  c++  java
  • Hive数据分析及优化

    (base) [root@pyspark conf]# cat hive-site.xml
    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
    <property>
    <name>hive.exec.scratchdir</name>
    <value>/root/hadoop/hadoop-2.9.2/hive/apache-hive-3.1.2-bin/tmp</value>
    </property>
    <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/root/hadoop/hadoop-2.9.2/hive/apache-hive-3.1.2-bin/warehouse</value>
    </property>
    <property>
    <name>hive.querylog.location</name>
    <value>/root/hadoop/hadoop-2.9.2/hive/apache-hive-3.1.2-bin/log</value>
    </property>


    <property>
    <name>hive.metastore.local</name>
    <value>true</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
    <description>JDBC connect string for a JDBC metastore</description>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    <description>Driver class name for a JDBC metastore</description>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    <description>Username to use against metastore database</description>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>MyNewPass4!</value>
    <description>password to use against metastore database</description>
    </property>

    </configuration>

    schematool -initSchema -dbType mysql

     nohup ./hiveserver2 &

     beeline -u jdbc:hive2://localhost:10000 -n username  -p password

    Hive事务操作需要设置两个参数:

    set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

    set hive.support.concurrency=true;

    修改 hive log 目录
    hive-exec-log4j.properties.template hive-exec-log4j.properties

    hive-log4j.properties.template hive-log4j.properties
    hive.log.dir=
    bin/Hive
    Hive MapREduce Task任务数设定
    set hive.exec.mode.local.auto=false;
    对于MapReduce程序来讲,有多少个MapTask和ReduceTask
    由什么来决定?
    map task;
    根据需要处理的数据量来决定,
    数据有多少个Block块儿,就有多少个MapTask;
    reduce task;
    可以直接设定;
    job.setNumReduceTasks(2);
    reduce 有几个,输出文件就有几个
    对于Hive来讲,本质也是MapReduce程序,这个参数怎么设定?
    Map Task:
    Block块儿决定;
    Reduce Task:

    默认值:
    设置每个Reduce Task处理的数据量。
    set hive.exec.reduce.bytes.per.reduce =
    256000000 256MB
    限制一个MapReduce中最大的ReduceTask的数量
    默认:1009,最大值:99999,通过看输出的文件名得知。
    set hive.exec.reducers.max = 1009
    MapReduce reduce task的个数,-1表示不设置

    set mapreduce.job.reduces = -1

    hive:desc function when
    desc function substring

    select
    t.area_group,count(1) as total
    from
    (
    select
    house_area,
    CASE
    WHEN 0<house_area and house_area <=50 then "50平以下"
    WHEN 50<house_area and house_area <=70 then "50-70平"
    WHEN 70<house_area and house_area <= 90 then "70-90平"
    WHEN 90<house_area and house_area <=110 then "90-110平"
    ELSE "110以上"
    END AS area_group
    from
    db_lianjia.tb_sh_price limit 100;
    )t
    group by t.area_group
    order by total desc;

    --length 获取字符串长度
    --trim 去除字符串两端的空字符" helloworld "->"helloworld"


    select t.house_age,count(1) AS total
    from
    (
    select
    build_date,
    CASE
    WHEN (2019-substring(buid_date,0,4))between 0 and 5 then "5年以内"
    WHEN (2019-substring(buid_date,0,4))between 5 and 10 then "10年以内"
    WHEN (2019-substring(buid_date,0,4))between 10 and 15 then "15年以内"
    WHEN (2019-substring(buid_date,0,4))between 15 and 20 then "20年以内"
    ELSE "20年以上"
    END AS house_age
    from
    db_lianjia.tb_sh_price
    where length(trim(build_date))>3
    )t
    group by t.house_age
    order by total desc;

    hive数据类型自动转换

    ========================================================================
    hadoop4个服务进程启动
    namenode,datanode,resourcemanager,nodemanager;
    MySQL服务进程启动 sudo service mysqld status
    http://hive.apache.org/
    Hive干什么?
    将SQL语言转换成MapReduce程序,并提交到yarn上运行;
    读取HDFS上数据进行处理;
    SQL Query:类似MySQL数据库的SQL
    概念SQL onHadoop
    -Hive Facebook开源的项目
    -presto 京东
    -impala
    -spark SQL

    MySQL数据库
    DML:数据操语言
    对数据的操作相关的SQL
    insert,update,delete,select;
    DDL:数据定义语言
    对表相关的操作;
    create,alter,drop,delete;


    MapReduce经典程序WordCount
    Hadoop处理数据的过程
    hadoop spark hive spark
    分割,转换,
    (hadoop,1),(spark,1)......
    然后对相加结果

    archive.apache.org/dist
    http://archive.apache.org/dist/hive/hive-1.2.1/


    http://archive.apache.org/dist/kafka/0.10.1.1/
    https://www.dybee.tv/87911.html
    2部分东西需要存储
    数据文件
    HDFS
    元数据metadata
    记录 有哪些数据库,有哪些表
    默认情况下存储derby,derby是一个嵌入式数据库,
    每次仅支持一个会话访问
    https://cwiki.apache.org/confluence/display/Hive/Home#Home-UserDocumentation

    Hive是一个工具,SQL转成MapReduce,提交到Yarn运行,读取HDFS上的数据
    使用Hive分析数据
    1.建立数据库和表
    2.将数据加载到表中
    3.写SQL分析
    join两个表进行联合查询(合并) 笛卡尔积?
    SQL join
    内连接
    join
    外连接

    左(外)连接
    左表的所有行都在(保留)
    select

    from
    A
    left join
    B

    右(外)连接


    join列增多,列连接

    union 连接行
    行增多

    设置本地运行模式
    set hive.exec.mode.local.auto= true;
    desc formatted table_name;
    Navicate 客户端
    导入数据的方式
    方式一;load
    加载本地数据
    LOAD DATA LOCAL INPATH '/LOCLAPATH' INTO TABLE XXX;
    加载HDFS数据
    LOAD DATA INPATH 'HDFSPATH' INTO TABLE XXX;
    相当于将文件移动到了对应表的HDFS目录
    方拾二:put
    直接使用put命令将文件上传到HDFS目录

    数据分析结果的保存
    结果不可能只是在控制台打印就完事
    将结果保存到第二张表中
    create table db_hive.result__xx AS
    select statment;
    Hive创建表的方式
    create table XXX(field);
    create table AS select xxxx;

    Hive的优化
    --使用临时表作为优化
    临时表将需要的字段存储起来,去除暂时用不上的数据这样减少了查询的字段数据

    Hive的运行模式
    MapReduce开发程序
    本地模式
    localmode,所有任务运行在一个JVM中
    开发模式,IDEA 点击右键运行
    集群模式
    YARN,每个Task各自运行在一个JVM中

    hive运行模式
    hive.exec.mode.local.auto
    如果设置为true;hive框架依据处理分析数据的大小进行判断;
    如果处理的数据小于Block(128MB),MapReduce运行在本地 ---????
    配置方式:
    hive cli 中设置,交互式命令行中设置;
    本次会话有效

    配置文件中配置hive-site.xml

    实际运用中
    编写脚本(命令集合),这些命令在linux中能够直接运行。
    bin/hive -help
    执行的SQL只有一条
    bin/hive -e ""
    执行的SQL有多条,将多条SQL放到文件中
    bin/hive -f xxx.sql
    创建数据库
    创建表
    加载数据LOAD
    执行查询SQL
    保存结果

    Hive表的类型
    MANAGED_TABLE
    管理表

    EXTERNAL_TABLE
    外部表external
    区别
    在删除表的时候,外部表只删除元数据,管理表既删除元数据也删除数据文件
    在企业中,往往使用外部表,为了数据安全
    hive在HDFS上数据管理,目录/user/hive/warehouse/
    数据库名称.db/表名称/文件s
    HADOOP
    解决了大数据存储:HDFS
    解决了大数据计算:MapReduce
    元数据:描述数据的数据
    5TB文件,记录这5TB文件是如何存储的数据
    Hadoop 2.x
    sbin/hadoop-daemon.sh start namenode
    sbin/hadoop-daemon.sh start datanode
    sbin/yarn-daemon.sh start resoucemanager
    sbin/yarn-daemon.sh start nodemanger
    模块-用户名-进程名称-主机

    logs
    *.log 程序启动日志
    *.out 程序运行日志
    tail -100 logs/*.log
    cat more less

    --hdfs
    --特点
    分布式,主从
    namenode
    --存储元数据
    --接收用户请求
    --管理所有从节点
    datanode
    分块
    默认大小:block 128MB
    副本
    --mapreduce
    分布式计算模型
    input
    mapper
    shuffle
    reduce
    output
    --yarn
    分布式的集群资源管理和任务调度
    分布式
    resourcemanager
    --管理集群资源
    --任务调度
    --管理所有从节点
    nodemanager
    mapreduce执行过程
    --input
    默认是从HDFS上读取数据
    只需要指定path
    --将每一行转换为keyvalue
    --输出:
    key value
    行偏移量 行内容
    --mapper
    --输入:
    input的输出
    map方法,一样调用一次map方法
    每一行内容进行分割
    输出
    key value
    单词 1

    --shuffle
    功能:
    分组:按照相同的key进行分组
    分区;
    排序:按照字典排序

    --reduce
    输入;
    key value
    单词 <1,1,1>
    reduce方法:每一种key会调用一次reduce方法

    bin/yarn jar
    share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar
    wordcount /data/input.data /data/output/output1
    补充
    MRHistoryServer
    配置MapReduce历史服务器,用来监控已经完成的MapReduce作业执行情况
    查看 有多少个map,多少reduce任务,什么时候提交,运行时间,什么时候完成的

    场景
    MapREduce程序是在半夜执行的,9点之后上班,有个Mapreduce任务执行失败
    ,应该看一下任务的运行过程。

    配置
    mapred-site.xml
    mapreduce.jobhistory.address 0.0.0.0:10020
    mapreduce.jobhistory.webapp.address 0.0.0.0:19888
    mapreduce.jobhistory.intermediate-done-dir
    mapreduce.jobhistory.done-dir
    mapreduce.jobhistory.cleaner.enable

    端口:
    HDFS(RPC):8020
    web端口:50070
    YARN:
    内部端口(RPC):8032
    web端口:8088
    history
    内部端口(RPC):10020
    web端口:19888

    启动histroy server

    sbin/mr-jobhistory-daemon.sh start history
    [root@pyspark hadoop-2.9.2]# sbin/mr-jobhistory-daemon.sh start historyserver
    starting historyserver, logging to /root/hadoop/hadoop-2.9.2/logs/mapred-root-historyserver-pyspark.out
    (base) [root@pyspark hadoop-2.9.2]# jps
    2578 NameNode
    3218 NodeManager
    2918 SecondaryNameNode
    25750 Jps
    25703 JobHistoryServer
    2700 DataNode
    3101 ResourceManager
    (base) [root@pyspark hadoop-2.9.2]#

    日志聚集功能:
    当mapreduce程序在Yarn运行的时候,会产生日志文件,我们将日志文件统计上传到
    HDFS目录中,以便后续监控查看
    yarn-site.xml
    yarn.log-aggregation-enable
    yarn.log-aggregation-retain-seconds
    yanr.log-aggregation-retain-check-interval-seconds
    重启yarn jobHistoryserver

    Driver
    --pojo 不继承不实现
    --继承实现 --官方推荐extends Configured implements Tool
    --不继承只实现 企业中的比较多 implements Tool
    --
    分布式:
    多台机器
    将东西进行划分,每台机器存储一部分,各个机器之间协作完成。
    主从架构
    主节点
    项目经理 管理者,调度这
    从节点
    被管理者,干活的
    分布式文件系统HDFS
    主节点 NameNode
    将文件划分为Block块进行存储(128MB)
    每个Block副本数是3个,
    从节点 DataNode
    管理存在当前DataNode所在机器上所有数据块Block
    分布式集群资源管理Yarn
    集群资源管理(CPU,MEM)
    任务(Job)调度
    主节点:ResourceManager(JVM进程)
    管理整个集群的资源
    接收客户端提供的应用
    从节点:NodeManager(JVM进程)
    管理当前台机器的资源
    很多程序可以运行在Yarn
    -MapReduce 并行数据处理框架
    -Spark 基于内存分布式计算框架
    sbin/yarn-daemon.sh start resourcemanager
    sbin/yarn-daemon.sh start nodemanager
    验证:
    方式一:jps 查看进程名称
    方式二:页面访问hostname:8088端口
    MapReduce配置
    并行计算框架
    思想:分而治之
    核心:
    Map:分
    并行处理数据,将数据分割,一部分一部分的处理
    Reduce:
    将Map处理的结果进行合并,包含一些业务逻辑在里面

    mapred-site.xml
    <properties>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
    </properties>
    测试MapReduce程序
    准备数据:存储在HDFS上
    MapReduce程序
    HADOOP官方文档已经提供了,可以直接用
    WORDCOUNT 词频统计
    share/hadoop/mapreduce/hadoop-mapreduce-example-2.7.3.jar
    mapreduce程序提交到Yarn上运行
    提交方式只有一种:
    bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-example-2.7.3.jar
    HDFS API访问文件
    MapReduce数据处理的过程:
    整个MapReduce程序中,所有数据都是以键值对(key,value)形式流动的
    1.input

    2.map
    shuffer 属于map和reduce中间阶段,框架会自动处理,我们可以不干预;
    分区;
    分组;
    排序:
    3.reduce
    4.output
    --对于input和output,正常情况下,不需要编写什么代码,主要是给一个路径即可。
    --核心关注map和reduce
    有对应Mapper类
    map(KEYIN key,VALUEIN value,Context context)
    key:行偏移量 毛用都没有
    value:行内容
    key(long) value(text)
    0 hadoop hive spark
    17 java hive hadoop
    输出
    key(text) value(int)
    hadoop 1
    hive 1
    spark 1
    java 1
    hive 1
    hadoop 1

    reduce 有对应Reduce类
    reduce(KEYIN key,Iterable<VALUEIN> values,Context context)
    key:业务需要中key的值,也就是map输出的key
    Iterable:集合()数组
    输入:
    key value
    hadoop <1,1>
    hive <1,1>
    java <1>
    spark <1>
    输出:
    key(text) value(int)
    hadoop 2
    hive 2
    java 1
    spark 1
    如果需要处理的数据有1000行,
    map方法就会执行1000次;
    reduce 有几种key就会执行几次
    Hadoop的数据类型
    Text:文本
    text ->string toString
    IntWriteble:int的包装类
    int -> IntWritable :用set
    IntWritable->int :用 get
    LongWritable:Long的包装类
    完成WordCount 程序之后打包,提价任务
    mvn clean
    mvn package
    HiveCountToolsUDF-1.0-SNAPSHOT.jar 上传到Hadoopo集群

    [root@pyspark hadoop]# yarn jar HiveCountToolsUDF-1.0-SNAPSHOT.jar com.yuejiesong.mapreduce.WordCountMapReduce /1.data /wordcountoutput/
    20/03/01 11:55:33 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
    20/03/01 11:55:37 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    20/03/01 11:55:39 INFO input.FileInputFormat: Total input files to process : 1
    20/03/01 11:55:40 INFO mapreduce.JobSubmitter: number of splits:1
    20/03/01 11:55:40 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
    20/03/01 11:55:41 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1582944287000_0001
    20/03/01 11:55:43 INFO impl.YarnClientImpl: Submitted application application_1582944287000_0001
    20/03/01 11:55:43 INFO mapreduce.Job: The url to track the job: http://pyspark:8088/proxy/application_1582944287000_0001/
    20/03/01 11:55:43 INFO mapreduce.Job: Running job: job_1582944287000_0001
    20/03/01 11:56:10 INFO mapreduce.Job: Job job_1582944287000_0001 running in uber mode : false
    20/03/01 11:56:10 INFO mapreduce.Job: map 0% reduce 0%
    20/03/01 11:56:40 INFO mapreduce.Job: map 100% reduce 0%
    20/03/01 11:57:05 INFO mapreduce.Job: map 100% reduce 100%
    20/03/01 11:57:07 INFO mapreduce.Job: Job job_1582944287000_0001 completed successfully
    20/03/01 11:57:07 INFO mapreduce.Job: Counters: 49
    File System Counters
    FILE: Number of bytes read=3900753
    FILE: Number of bytes written=8198265
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=1896718
    HDFS: Number of bytes written=183614
    HDFS: Number of read operations=6
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    Job Counters
    Launched map tasks=1
    Launched reduce tasks=1
    Data-local map tasks=1
    Total time spent by all maps in occupied slots (ms)=23795
    Total time spent by all reduces in occupied slots (ms)=22119
    Total time spent by all map tasks (ms)=23795
    Total time spent by all reduce tasks (ms)=22119
    Total vcore-milliseconds taken by all map tasks=23795
    Total vcore-milliseconds taken by all reduce tasks=22119
    Total megabyte-milliseconds taken by all map tasks=24366080
    Total megabyte-milliseconds taken by all reduce tasks=22649856
    Map-Reduce Framework
    Map input records=8598
    Map output records=335454
    Map output bytes=3229839
    Map output materialized bytes=3900753
    Input split bytes=97
    Combine input records=0
    Combine output records=0
    Reduce input groups=16985
    Reduce shuffle bytes=3900753
    Reduce input records=335454
    Reduce output records=16985
    Spilled Records=670908
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=3403
    CPU time spent (ms)=16580
    Physical memory (bytes) snapshot=452075520
    Virtual memory (bytes) snapshot=4231335936
    Total committed heap usage (bytes)=297271296
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters
    Bytes Read=1896621
    File Output Format Counters
    Bytes Written=183614







    在企业汇总部署Hadoop集群的时候,HDFS的DataNode和Yarn的NodeManager基本上出现在同一个机器上
    ,充分利用资源。

    Map以一条记录为单位做映射
    Reduce以一组数据为单位进行计算
    什么叫做一组,分组
    具有相同特征的数据为一组,特征为key
    依赖一种格式<key,value>
    key,value 由map映射实现
    Map:
    --映射,变化,过滤
    --1进N出
    Reduc:
    --分解,缩小,归纳
    --一组进N出
    (key,val):
    --键值对的健划分数据分组
    CPU密集型计算
    IP密集型计算
    split <Block
    split = Block
    split > Block
    split对应文件内容范围

    控制文件粒度-并行度

    map的并行度由split(切片)的数量决定的
    一个split对应一个map计算
    map计算程序向数据位置移动
    reduce的并行度由key的种类决定

    一个reduce可以处理多可key类型的结果,
    但是是顺序执行的,先计算一个组,再计算另外一个组
    组是最小粒度,不可再拆分
    reduce的并行度you ren lai jueding

    MR:
    数据以一条记录为单位经过map方法映射kv,
    相同的key为一组,这一组数据调用一次reduce方法,在方法内迭代计算这一组数据。
    java 迭代器模式
    数据集:一般使用迭代计算方式
    [root@pyspark ~]# jps
    46688 RunJar
    26305 ResourceManager
    2578 NameNode
    46835 Jps
    2918 SecondaryNameNode
    26390 NodeManager
    26745 JobHistoryServer
    2700 DataNode
    46814 MRAppMaster --MapReduce进程

    deprecated


    what,why,how

    Hive优化:
    1.使用中间表进行优化
    将我们需要的字段,放到一个中间表中,分析业务的指标的时候使用中间表。
    2.压缩
    hive底层就是MapReduce程序
    可以对MapReduce程序Map的输出设置为压缩
    网络和磁盘IO
    mapreduce shuffer过程
    2个阶段:Map端 shuffle ,reduce端shuffle
    a.3个必选项
    分区;patitioner 决定MapTask输出的数据,由哪个reduceTask处理
    排序;
    分组:
    b.2个可选项(优化点)
    map端shuffle
    -合并combiner
    在map端进行reduce聚合操作,并不是所有的mapreduce程序都可以设置combiner操作;
    -压缩compress
    压缩map输出的结果,减少网络和磁盘IO
    常见压缩算法
    lzo,snappy,lz4

    总结:
    -a.对于MapReduce程序来说
    mapreduce.map.out.compress
    mapreduce.map.output.compress.codec
    -b.对于hive来说
    set hive。exec.compress.intermediate
    mapreduce.map.out.compress
    mapreduce.map.output.compress.codec
    -c.配置方式:
    mapred-site.xml
    mapreduce.map.out.compress
    mapreduce.map.output.compress.codec
    hive-site.xml

    mapreduce.map.out.compress
    mapreduce.map.output.compress.codec

    https://www.bilibili.com/video/av50214059?p=9
    bin/hadoop checknative
    3.列式存储
    数据存储在文件中,按照列来存储的,
    同一列的所有数据存放在一起。
    文件格式:

    文本文件:
    按照行存储,
    a.windows下使用记事本,可以打开文件
    b.linux 系统下,使用cat/more/head/tail/less打开文件

    Hive压缩和列式存储
    压缩有很多种,
    列式存储的方式也有很多种
    压缩和列式存储可以一起使用。
    --parquet 列存储
    snappy压缩
    set parquet.compresstion =snappy;
    orc列式存储+snappy压缩
    企业中通常采用:
    parquet+snappy
    orc+snappy
    使用列式存储的好处
    a.表中d 文件数据大小减少了->减少了磁盘存储空间,磁盘IO
    b.分析数据的时候,加载的数据量变少了。

  • 相关阅读:
    how to pass a Javabean to server In Model2 architecture.
    What is the Web Appliation Archive, abbreviation is "WAR"
    Understaning Javascript OO
    Genetic Fraud
    poj 3211 Washing Clothes
    poj 2385 Apple Catching
    Magic Star
    关于memset的用法几点
    c++ 函数
    zoj 2972 Hurdles of 110m
  • 原文地址:https://www.cnblogs.com/songyuejie/p/12555004.html
Copyright © 2011-2022 走看看