zoukankan      html  css  js  c++  java
  • Hive数据分析及优化

    (base) [root@pyspark conf]# cat hive-site.xml
    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
    <property>
    <name>hive.exec.scratchdir</name>
    <value>/root/hadoop/hadoop-2.9.2/hive/apache-hive-3.1.2-bin/tmp</value>
    </property>
    <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/root/hadoop/hadoop-2.9.2/hive/apache-hive-3.1.2-bin/warehouse</value>
    </property>
    <property>
    <name>hive.querylog.location</name>
    <value>/root/hadoop/hadoop-2.9.2/hive/apache-hive-3.1.2-bin/log</value>
    </property>


    <property>
    <name>hive.metastore.local</name>
    <value>true</value>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
    <description>JDBC connect string for a JDBC metastore</description>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    <description>Driver class name for a JDBC metastore</description>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    <description>Username to use against metastore database</description>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>MyNewPass4!</value>
    <description>password to use against metastore database</description>
    </property>

    </configuration>

    schematool -initSchema -dbType mysql

     nohup ./hiveserver2 &

     beeline -u jdbc:hive2://localhost:10000 -n username  -p password

    Hive事务操作需要设置两个参数:

    set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

    set hive.support.concurrency=true;

    修改 hive log 目录
    hive-exec-log4j.properties.template hive-exec-log4j.properties

    hive-log4j.properties.template hive-log4j.properties
    hive.log.dir=
    bin/Hive
    Hive MapREduce Task任务数设定
    set hive.exec.mode.local.auto=false;
    对于MapReduce程序来讲,有多少个MapTask和ReduceTask
    由什么来决定?
    map task;
    根据需要处理的数据量来决定,
    数据有多少个Block块儿,就有多少个MapTask;
    reduce task;
    可以直接设定;
    job.setNumReduceTasks(2);
    reduce 有几个,输出文件就有几个
    对于Hive来讲,本质也是MapReduce程序,这个参数怎么设定?
    Map Task:
    Block块儿决定;
    Reduce Task:

    默认值:
    设置每个Reduce Task处理的数据量。
    set hive.exec.reduce.bytes.per.reduce =
    256000000 256MB
    限制一个MapReduce中最大的ReduceTask的数量
    默认:1009,最大值:99999,通过看输出的文件名得知。
    set hive.exec.reducers.max = 1009
    MapReduce reduce task的个数,-1表示不设置

    set mapreduce.job.reduces = -1

    hive:desc function when
    desc function substring

    select
    t.area_group,count(1) as total
    from
    (
    select
    house_area,
    CASE
    WHEN 0<house_area and house_area <=50 then "50平以下"
    WHEN 50<house_area and house_area <=70 then "50-70平"
    WHEN 70<house_area and house_area <= 90 then "70-90平"
    WHEN 90<house_area and house_area <=110 then "90-110平"
    ELSE "110以上"
    END AS area_group
    from
    db_lianjia.tb_sh_price limit 100;
    )t
    group by t.area_group
    order by total desc;

    --length 获取字符串长度
    --trim 去除字符串两端的空字符" helloworld "->"helloworld"


    select t.house_age,count(1) AS total
    from
    (
    select
    build_date,
    CASE
    WHEN (2019-substring(buid_date,0,4))between 0 and 5 then "5年以内"
    WHEN (2019-substring(buid_date,0,4))between 5 and 10 then "10年以内"
    WHEN (2019-substring(buid_date,0,4))between 10 and 15 then "15年以内"
    WHEN (2019-substring(buid_date,0,4))between 15 and 20 then "20年以内"
    ELSE "20年以上"
    END AS house_age
    from
    db_lianjia.tb_sh_price
    where length(trim(build_date))>3
    )t
    group by t.house_age
    order by total desc;

    hive数据类型自动转换

    ========================================================================
    hadoop4个服务进程启动
    namenode,datanode,resourcemanager,nodemanager;
    MySQL服务进程启动 sudo service mysqld status
    http://hive.apache.org/
    Hive干什么?
    将SQL语言转换成MapReduce程序,并提交到yarn上运行;
    读取HDFS上数据进行处理;
    SQL Query:类似MySQL数据库的SQL
    概念SQL onHadoop
    -Hive Facebook开源的项目
    -presto 京东
    -impala
    -spark SQL

    MySQL数据库
    DML:数据操语言
    对数据的操作相关的SQL
    insert,update,delete,select;
    DDL:数据定义语言
    对表相关的操作;
    create,alter,drop,delete;


    MapReduce经典程序WordCount
    Hadoop处理数据的过程
    hadoop spark hive spark
    分割,转换,
    (hadoop,1),(spark,1)......
    然后对相加结果

    archive.apache.org/dist
    http://archive.apache.org/dist/hive/hive-1.2.1/


    http://archive.apache.org/dist/kafka/0.10.1.1/
    https://www.dybee.tv/87911.html
    2部分东西需要存储
    数据文件
    HDFS
    元数据metadata
    记录 有哪些数据库,有哪些表
    默认情况下存储derby,derby是一个嵌入式数据库,
    每次仅支持一个会话访问
    https://cwiki.apache.org/confluence/display/Hive/Home#Home-UserDocumentation

    Hive是一个工具,SQL转成MapReduce,提交到Yarn运行,读取HDFS上的数据
    使用Hive分析数据
    1.建立数据库和表
    2.将数据加载到表中
    3.写SQL分析
    join两个表进行联合查询(合并) 笛卡尔积?
    SQL join
    内连接
    join
    外连接

    左(外)连接
    左表的所有行都在(保留)
    select

    from
    A
    left join
    B

    右(外)连接


    join列增多,列连接

    union 连接行
    行增多

    设置本地运行模式
    set hive.exec.mode.local.auto= true;
    desc formatted table_name;
    Navicate 客户端
    导入数据的方式
    方式一;load
    加载本地数据
    LOAD DATA LOCAL INPATH '/LOCLAPATH' INTO TABLE XXX;
    加载HDFS数据
    LOAD DATA INPATH 'HDFSPATH' INTO TABLE XXX;
    相当于将文件移动到了对应表的HDFS目录
    方拾二:put
    直接使用put命令将文件上传到HDFS目录

    数据分析结果的保存
    结果不可能只是在控制台打印就完事
    将结果保存到第二张表中
    create table db_hive.result__xx AS
    select statment;
    Hive创建表的方式
    create table XXX(field);
    create table AS select xxxx;

    Hive的优化
    --使用临时表作为优化
    临时表将需要的字段存储起来,去除暂时用不上的数据这样减少了查询的字段数据

    Hive的运行模式
    MapReduce开发程序
    本地模式
    localmode,所有任务运行在一个JVM中
    开发模式,IDEA 点击右键运行
    集群模式
    YARN,每个Task各自运行在一个JVM中

    hive运行模式
    hive.exec.mode.local.auto
    如果设置为true;hive框架依据处理分析数据的大小进行判断;
    如果处理的数据小于Block(128MB),MapReduce运行在本地 ---????
    配置方式:
    hive cli 中设置,交互式命令行中设置;
    本次会话有效

    配置文件中配置hive-site.xml

    实际运用中
    编写脚本(命令集合),这些命令在linux中能够直接运行。
    bin/hive -help
    执行的SQL只有一条
    bin/hive -e ""
    执行的SQL有多条,将多条SQL放到文件中
    bin/hive -f xxx.sql
    创建数据库
    创建表
    加载数据LOAD
    执行查询SQL
    保存结果

    Hive表的类型
    MANAGED_TABLE
    管理表

    EXTERNAL_TABLE
    外部表external
    区别
    在删除表的时候,外部表只删除元数据,管理表既删除元数据也删除数据文件
    在企业中,往往使用外部表,为了数据安全
    hive在HDFS上数据管理,目录/user/hive/warehouse/
    数据库名称.db/表名称/文件s
    HADOOP
    解决了大数据存储:HDFS
    解决了大数据计算:MapReduce
    元数据:描述数据的数据
    5TB文件,记录这5TB文件是如何存储的数据
    Hadoop 2.x
    sbin/hadoop-daemon.sh start namenode
    sbin/hadoop-daemon.sh start datanode
    sbin/yarn-daemon.sh start resoucemanager
    sbin/yarn-daemon.sh start nodemanger
    模块-用户名-进程名称-主机

    logs
    *.log 程序启动日志
    *.out 程序运行日志
    tail -100 logs/*.log
    cat more less

    --hdfs
    --特点
    分布式,主从
    namenode
    --存储元数据
    --接收用户请求
    --管理所有从节点
    datanode
    分块
    默认大小:block 128MB
    副本
    --mapreduce
    分布式计算模型
    input
    mapper
    shuffle
    reduce
    output
    --yarn
    分布式的集群资源管理和任务调度
    分布式
    resourcemanager
    --管理集群资源
    --任务调度
    --管理所有从节点
    nodemanager
    mapreduce执行过程
    --input
    默认是从HDFS上读取数据
    只需要指定path
    --将每一行转换为keyvalue
    --输出:
    key value
    行偏移量 行内容
    --mapper
    --输入:
    input的输出
    map方法,一样调用一次map方法
    每一行内容进行分割
    输出
    key value
    单词 1

    --shuffle
    功能:
    分组:按照相同的key进行分组
    分区;
    排序:按照字典排序

    --reduce
    输入;
    key value
    单词 <1,1,1>
    reduce方法:每一种key会调用一次reduce方法

    bin/yarn jar
    share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar
    wordcount /data/input.data /data/output/output1
    补充
    MRHistoryServer
    配置MapReduce历史服务器,用来监控已经完成的MapReduce作业执行情况
    查看 有多少个map,多少reduce任务,什么时候提交,运行时间,什么时候完成的

    场景
    MapREduce程序是在半夜执行的,9点之后上班,有个Mapreduce任务执行失败
    ,应该看一下任务的运行过程。

    配置
    mapred-site.xml
    mapreduce.jobhistory.address 0.0.0.0:10020
    mapreduce.jobhistory.webapp.address 0.0.0.0:19888
    mapreduce.jobhistory.intermediate-done-dir
    mapreduce.jobhistory.done-dir
    mapreduce.jobhistory.cleaner.enable

    端口:
    HDFS(RPC):8020
    web端口:50070
    YARN:
    内部端口(RPC):8032
    web端口:8088
    history
    内部端口(RPC):10020
    web端口:19888

    启动histroy server

    sbin/mr-jobhistory-daemon.sh start history
    [root@pyspark hadoop-2.9.2]# sbin/mr-jobhistory-daemon.sh start historyserver
    starting historyserver, logging to /root/hadoop/hadoop-2.9.2/logs/mapred-root-historyserver-pyspark.out
    (base) [root@pyspark hadoop-2.9.2]# jps
    2578 NameNode
    3218 NodeManager
    2918 SecondaryNameNode
    25750 Jps
    25703 JobHistoryServer
    2700 DataNode
    3101 ResourceManager
    (base) [root@pyspark hadoop-2.9.2]#

    日志聚集功能:
    当mapreduce程序在Yarn运行的时候,会产生日志文件,我们将日志文件统计上传到
    HDFS目录中,以便后续监控查看
    yarn-site.xml
    yarn.log-aggregation-enable
    yarn.log-aggregation-retain-seconds
    yanr.log-aggregation-retain-check-interval-seconds
    重启yarn jobHistoryserver

    Driver
    --pojo 不继承不实现
    --继承实现 --官方推荐extends Configured implements Tool
    --不继承只实现 企业中的比较多 implements Tool
    --
    分布式:
    多台机器
    将东西进行划分,每台机器存储一部分,各个机器之间协作完成。
    主从架构
    主节点
    项目经理 管理者,调度这
    从节点
    被管理者,干活的
    分布式文件系统HDFS
    主节点 NameNode
    将文件划分为Block块进行存储(128MB)
    每个Block副本数是3个,
    从节点 DataNode
    管理存在当前DataNode所在机器上所有数据块Block
    分布式集群资源管理Yarn
    集群资源管理(CPU,MEM)
    任务(Job)调度
    主节点:ResourceManager(JVM进程)
    管理整个集群的资源
    接收客户端提供的应用
    从节点:NodeManager(JVM进程)
    管理当前台机器的资源
    很多程序可以运行在Yarn
    -MapReduce 并行数据处理框架
    -Spark 基于内存分布式计算框架
    sbin/yarn-daemon.sh start resourcemanager
    sbin/yarn-daemon.sh start nodemanager
    验证:
    方式一:jps 查看进程名称
    方式二:页面访问hostname:8088端口
    MapReduce配置
    并行计算框架
    思想:分而治之
    核心:
    Map:分
    并行处理数据,将数据分割,一部分一部分的处理
    Reduce:
    将Map处理的结果进行合并,包含一些业务逻辑在里面

    mapred-site.xml
    <properties>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
    </properties>
    测试MapReduce程序
    准备数据:存储在HDFS上
    MapReduce程序
    HADOOP官方文档已经提供了,可以直接用
    WORDCOUNT 词频统计
    share/hadoop/mapreduce/hadoop-mapreduce-example-2.7.3.jar
    mapreduce程序提交到Yarn上运行
    提交方式只有一种:
    bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-example-2.7.3.jar
    HDFS API访问文件
    MapReduce数据处理的过程:
    整个MapReduce程序中,所有数据都是以键值对(key,value)形式流动的
    1.input

    2.map
    shuffer 属于map和reduce中间阶段,框架会自动处理,我们可以不干预;
    分区;
    分组;
    排序:
    3.reduce
    4.output
    --对于input和output,正常情况下,不需要编写什么代码,主要是给一个路径即可。
    --核心关注map和reduce
    有对应Mapper类
    map(KEYIN key,VALUEIN value,Context context)
    key:行偏移量 毛用都没有
    value:行内容
    key(long) value(text)
    0 hadoop hive spark
    17 java hive hadoop
    输出
    key(text) value(int)
    hadoop 1
    hive 1
    spark 1
    java 1
    hive 1
    hadoop 1

    reduce 有对应Reduce类
    reduce(KEYIN key,Iterable<VALUEIN> values,Context context)
    key:业务需要中key的值,也就是map输出的key
    Iterable:集合()数组
    输入:
    key value
    hadoop <1,1>
    hive <1,1>
    java <1>
    spark <1>
    输出:
    key(text) value(int)
    hadoop 2
    hive 2
    java 1
    spark 1
    如果需要处理的数据有1000行,
    map方法就会执行1000次;
    reduce 有几种key就会执行几次
    Hadoop的数据类型
    Text:文本
    text ->string toString
    IntWriteble:int的包装类
    int -> IntWritable :用set
    IntWritable->int :用 get
    LongWritable:Long的包装类
    完成WordCount 程序之后打包,提价任务
    mvn clean
    mvn package
    HiveCountToolsUDF-1.0-SNAPSHOT.jar 上传到Hadoopo集群

    [root@pyspark hadoop]# yarn jar HiveCountToolsUDF-1.0-SNAPSHOT.jar com.yuejiesong.mapreduce.WordCountMapReduce /1.data /wordcountoutput/
    20/03/01 11:55:33 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
    20/03/01 11:55:37 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    20/03/01 11:55:39 INFO input.FileInputFormat: Total input files to process : 1
    20/03/01 11:55:40 INFO mapreduce.JobSubmitter: number of splits:1
    20/03/01 11:55:40 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
    20/03/01 11:55:41 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1582944287000_0001
    20/03/01 11:55:43 INFO impl.YarnClientImpl: Submitted application application_1582944287000_0001
    20/03/01 11:55:43 INFO mapreduce.Job: The url to track the job: http://pyspark:8088/proxy/application_1582944287000_0001/
    20/03/01 11:55:43 INFO mapreduce.Job: Running job: job_1582944287000_0001
    20/03/01 11:56:10 INFO mapreduce.Job: Job job_1582944287000_0001 running in uber mode : false
    20/03/01 11:56:10 INFO mapreduce.Job: map 0% reduce 0%
    20/03/01 11:56:40 INFO mapreduce.Job: map 100% reduce 0%
    20/03/01 11:57:05 INFO mapreduce.Job: map 100% reduce 100%
    20/03/01 11:57:07 INFO mapreduce.Job: Job job_1582944287000_0001 completed successfully
    20/03/01 11:57:07 INFO mapreduce.Job: Counters: 49
    File System Counters
    FILE: Number of bytes read=3900753
    FILE: Number of bytes written=8198265
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=1896718
    HDFS: Number of bytes written=183614
    HDFS: Number of read operations=6
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    Job Counters
    Launched map tasks=1
    Launched reduce tasks=1
    Data-local map tasks=1
    Total time spent by all maps in occupied slots (ms)=23795
    Total time spent by all reduces in occupied slots (ms)=22119
    Total time spent by all map tasks (ms)=23795
    Total time spent by all reduce tasks (ms)=22119
    Total vcore-milliseconds taken by all map tasks=23795
    Total vcore-milliseconds taken by all reduce tasks=22119
    Total megabyte-milliseconds taken by all map tasks=24366080
    Total megabyte-milliseconds taken by all reduce tasks=22649856
    Map-Reduce Framework
    Map input records=8598
    Map output records=335454
    Map output bytes=3229839
    Map output materialized bytes=3900753
    Input split bytes=97
    Combine input records=0
    Combine output records=0
    Reduce input groups=16985
    Reduce shuffle bytes=3900753
    Reduce input records=335454
    Reduce output records=16985
    Spilled Records=670908
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=3403
    CPU time spent (ms)=16580
    Physical memory (bytes) snapshot=452075520
    Virtual memory (bytes) snapshot=4231335936
    Total committed heap usage (bytes)=297271296
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters
    Bytes Read=1896621
    File Output Format Counters
    Bytes Written=183614







    在企业汇总部署Hadoop集群的时候,HDFS的DataNode和Yarn的NodeManager基本上出现在同一个机器上
    ,充分利用资源。

    Map以一条记录为单位做映射
    Reduce以一组数据为单位进行计算
    什么叫做一组,分组
    具有相同特征的数据为一组,特征为key
    依赖一种格式<key,value>
    key,value 由map映射实现
    Map:
    --映射,变化,过滤
    --1进N出
    Reduc:
    --分解,缩小,归纳
    --一组进N出
    (key,val):
    --键值对的健划分数据分组
    CPU密集型计算
    IP密集型计算
    split <Block
    split = Block
    split > Block
    split对应文件内容范围

    控制文件粒度-并行度

    map的并行度由split(切片)的数量决定的
    一个split对应一个map计算
    map计算程序向数据位置移动
    reduce的并行度由key的种类决定

    一个reduce可以处理多可key类型的结果,
    但是是顺序执行的,先计算一个组,再计算另外一个组
    组是最小粒度,不可再拆分
    reduce的并行度you ren lai jueding

    MR:
    数据以一条记录为单位经过map方法映射kv,
    相同的key为一组,这一组数据调用一次reduce方法,在方法内迭代计算这一组数据。
    java 迭代器模式
    数据集:一般使用迭代计算方式
    [root@pyspark ~]# jps
    46688 RunJar
    26305 ResourceManager
    2578 NameNode
    46835 Jps
    2918 SecondaryNameNode
    26390 NodeManager
    26745 JobHistoryServer
    2700 DataNode
    46814 MRAppMaster --MapReduce进程

    deprecated


    what,why,how

    Hive优化:
    1.使用中间表进行优化
    将我们需要的字段,放到一个中间表中,分析业务的指标的时候使用中间表。
    2.压缩
    hive底层就是MapReduce程序
    可以对MapReduce程序Map的输出设置为压缩
    网络和磁盘IO
    mapreduce shuffer过程
    2个阶段:Map端 shuffle ,reduce端shuffle
    a.3个必选项
    分区;patitioner 决定MapTask输出的数据,由哪个reduceTask处理
    排序;
    分组:
    b.2个可选项(优化点)
    map端shuffle
    -合并combiner
    在map端进行reduce聚合操作,并不是所有的mapreduce程序都可以设置combiner操作;
    -压缩compress
    压缩map输出的结果,减少网络和磁盘IO
    常见压缩算法
    lzo,snappy,lz4

    总结:
    -a.对于MapReduce程序来说
    mapreduce.map.out.compress
    mapreduce.map.output.compress.codec
    -b.对于hive来说
    set hive。exec.compress.intermediate
    mapreduce.map.out.compress
    mapreduce.map.output.compress.codec
    -c.配置方式:
    mapred-site.xml
    mapreduce.map.out.compress
    mapreduce.map.output.compress.codec
    hive-site.xml

    mapreduce.map.out.compress
    mapreduce.map.output.compress.codec

    https://www.bilibili.com/video/av50214059?p=9
    bin/hadoop checknative
    3.列式存储
    数据存储在文件中,按照列来存储的,
    同一列的所有数据存放在一起。
    文件格式:

    文本文件:
    按照行存储,
    a.windows下使用记事本,可以打开文件
    b.linux 系统下,使用cat/more/head/tail/less打开文件

    Hive压缩和列式存储
    压缩有很多种,
    列式存储的方式也有很多种
    压缩和列式存储可以一起使用。
    --parquet 列存储
    snappy压缩
    set parquet.compresstion =snappy;
    orc列式存储+snappy压缩
    企业中通常采用:
    parquet+snappy
    orc+snappy
    使用列式存储的好处
    a.表中d 文件数据大小减少了->减少了磁盘存储空间,磁盘IO
    b.分析数据的时候,加载的数据量变少了。

  • 相关阅读:
    Windows的全新文件系统:ReFS
    Eclipse Virgo 3.0发布了
    Web Sites and a Plugin Free Web
    PhoneGap现已完全支持WP7
    深入探索PowerPivot客户端和服务器端架构
    Funf,一个传感和数据处理的移动框架
    利用Contained Database和DAC来开发基于SQL Server "Denali"和SQL Azure之上的应用程序
    一个漂亮的HTML5后台管理界面模板
    笔试题收集
    笔试面试的准备工作
  • 原文地址:https://www.cnblogs.com/songyuejie/p/12555004.html
Copyright © 2011-2022 走看看