zoukankan      html  css  js  c++  java
  • 标签整理

    1. MapReduce与mysql连接总结

    应用场景:

      在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些是 hbase 或者 hive 目前亟待改进的地方。

    1.从mysql中

    读数据:

      Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db 中。DBInputFormat 在 Hadoop 应用程序中通过数据库供应商提供的 JDBC接口来与数据库进行交互,并且可以使用标准的 SQL 来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。

    1. 在使用 DBInputFormat 之前,必须将要使用的 JDBC 驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。

    2. MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理,处理的方法可参考:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html

    写数据:

       往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。

      1.   DBOutFormat: 提供数据库写入接口。
      2.   DBRecordWriter:提供向数据库中写入的数据记录的接口。
      3. DBConfiguration:提供数据库配置和创建链接的接口

    2.Hive常见命令

      Hive常用的SQL命令操作 

      Hive导出查询内容:  INSERT OVERWRITE LOCAL DIRECTORY  '/tmp/result.txt' select id,name from t_test; 

                 hive -e"select id,name from t_test;"> result.txt

    连接hive的三种方式:

      1.cli  本质上是每个连接都存放一个元数据,各个之间都不相同,不适合做产品的开发和应用

      2.JDBC连接的方式,容易被大数据量冲挂,不稳定

      3. 直接利用Hive的 Driver class 来直接连接  Driver driver = new Driver(new HiveConf(SessionState.class));

    远程连接Hive

      hive --service hiveserver -p 50000 &  

      打开50000端口,然后java就可以使用java连了,将所需的jar包做个标记

    HQL结果直接导入mysql

    1、首先下载mysql-connector-java jar包。

    2、在hive cli端添加必要jar:

    add jar /home/hadoop/hive-0.12.0/lib/hive-contrib-0.12.0.jar;

    add jar /home/hadoop/hive-0.12.0/lib/mysql-connector-java-5.1.27-bin.jar;

    3、给指点方法弄个简称:

    CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput';

    4、执行:

    select dboutput('jdbc:mysql://localhost:port/dbname','db_username','db_pwd','INSERT INTO mysql_table(field1,field2,field3) VALUES (6,?,?)',substr(field_i,1,10),count(field_j)) from hive_table group by substr(field_i,1,10) limit 10;

    问题:

    发现总提示找不到org.apache.Hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput

    解决办法:

    后来经过琢磨才弄明白org.apache.Hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput部分自己要去编写,编写后打成jar包 用add jar添加进去就可以了。

    Python连接Hive
    import sys
    from hive_service import ThriftHive
    from hive_service.ttypes import HiveServerException
    from thrift import Thrift
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    
    try:
        transport = TSocket.TSocket('192.168.30.201', 10000)
        transport = TTransport.TBufferedTransport(transport)
        protocol = TBinaryProtocol.TBinaryProtocol(transport)
    
        client = ThriftHive.Client(protocol)
        transport.open()
        hql = '''CREATE TABLE people(a STRING, b INT, c DOUBLE) row format delimited fields terminated by ',' '''
        print hql
    
        client.execute(hql)
        client.execute("LOAD DATA LOCAL INPATH '/home/diver/data.txt' INTO TABLE people")
        #client.execute("SELECT * FROM people")
        #while (1):
        #  row = client.fetchOne()
        #  if (row == None):
        #    break
        #  print row
        client.execute("SELECT count(*) FROM people")
        print client.fetchAll()
    
        transport.close()
    
    except Thrift.TException, tx:
        print '%s' % (tx.message)
    

      

    #!/usr/bin/python
    #-*-coding:UTF-8 -*-
    import sys
    import os
    import string
    import re
    import MySQLdb
    
    from hive_service import ThriftHive
    from hive_service.ttypes import HiveServerException
    from thrift import Thrift
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    
    def hiveExe(hsql,dbname):
    #定义hive查询函数
                    try:
                                    transport = TSocket.TSocket('192.168.10.1', 10000)
                                    transport = TTransport.TBufferedTransport(transport)
                                    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    
                                    client = ThriftHive.Client(protocol)
                                    transport.open()
    
                                    client.execute('ADD jar /opt/modules/hive/hive-0.7.1/lib/hive-contrib-0.7.1.jar')
    
                                    client.execute("use "+dbname)
                                    row = client.fetchOne()
                                    #使用库名,只需一次fetch,用fetchOne
    
                                    client.execute(hsql)
                                    return client.fetchAll()
                                    #查询所有数据,用fetchAll()
    
                                    transport.close()
    
                    except Thrift.TException, tx:
                                    print '%s' % (tx.message)
    
    def mysqlExe(sql):
                    try:
                                    conn = MySQLdb.connect(user="test",passwd="test123",host="127.0.0.1",db="active2_ip",port=5029)
                    except Exception,data:
                                    print "Could not connect to MySQL server.:",data
                    try:
                                    cursor = conn.cursor()
                                    cursor.execute(sql)
                                    return row
                                    cursor.commit()
                                    cursor.close()
                                    conn.close()
                    except Exception,data:
                                    print "Could not Fetch anything:",data
    
    dbname = "active2"
    date = os.popen("date -d '1 day ago' +%Y%m%d").read().strip()
    #shell方式取昨天日期,读取并去前后
    
    date.close()
    
    sql = "create table IF NOT EXISTS "+dbname+"_group_ip_"+date+" like "+dbname+"_group_ip;load data infile '/tmp/"+dbname+"_"+date+".csv' into table "+dbname+"_group_ip_"+date+" FIELDS TERMINATED BY ','"
    #以模板表创建日期表,并load data到该表中
    
    hsql = "insert overwrite local directory '/tmp/"+dbname+"_"+date+"' select count(version) as vc,stat_hour,type,version,province,city,isp from "+dbname+"_"+date+" group by province,city,version,type,stat_hour,isp"
    #hive查询,并将查询结果导出到本地/tmp/active2_20111129目录下,可能生成多个文件
    
    hiveExe(hsql, dbname)
    #执行查询
    
    os.system("sudo cat /tmp/"+dbname+"_"+date+"/* > /tmp/tmplog ")
    #将多个文件通过shell合并为一个文件tmplog
    
    file1 = open("/tmp/tmplog", 'r')
    #打开合并后的临时文件
    file2 = open("/tmp/"+dbname+"_"+date+".csv",'w')
    #打开另一个文件,做文字替换。因为hive导出结果,其分隔符为特殊字符。所以需要做替换,格式为csv,故用逗号分隔
    sep = ','
    for line in file1:
                    tmp = line[:-1].split('x01')
                    #hive导出文件分隔符为ascii中的001,x01是16进制,但其实也就是十进制的1
                    replace = sep.join(tmp)
                    file2.write(replace+"
    ")
    
    
    file1.close()
    file2.close()
    
    os.system("sudo rm -f /tmp/tmplog")
    #删除临时的tmplog
    
    mysqlExe(sql)
    #执行mysql查询,创建表和加载数据。
    os.system("sudo rm -f /tmp/"+dbname+"_"+date)
    

     Thrift是Apache的一个开源的跨语言服务开发框架,它提供了一个代码生成引擎来构建服务,支持C++,Java,Python,PHP,Ruby,Erlang,Perl,Haskell,C#,Cocoa,JavaScript,Node.js,Smalltalk,OCaml,Delphi等多种编程语言。

    一般来说,使用Thrift来开发应用程序,主要建立在两种场景下:

    • 第一,在我们开发过程中,一个比较大的项目需要多个团队进行协作,而每个团队的成员在编程技术方面的技能可能不一定相同,为了实现这种跨语言的开发氛围,使用Thrift来构建服务
    • 第二,企业之间合作,在业务上不可避免出现跨语言的编程环境,使用Thrift可以达到类似Web Services的跨平台的特性

    Python就是用Thrift来连接Hive的

    #!/bin/sh
    # 一键安装thrift-0.9.0的脚本
    # thrift依赖boost、openssl和libevent
    # 下面的变量值可以根据实现做修改
    PROJECT_HOME=$HOME/iflow # 项目源码主目录
    # thrift及依赖的第三方库源码包存放目录和安装目录,
    # 一键脚本要和第三方库源码包放在同一个目录下
    THIRD_PARTY_HOME=$PROJECT_HOME/third-party
    boost=boost_1_52_0
    openssl=openssl-1.0.1c
    libevent=libevent-2.0.19-stable
    thrift=thrift-0.9.0
    #
    # 安装boost
    #
    printf "n33[0;32;34minstalling boost33[mn"
    tar xzf $boost.tar.gz
    cd $boost
    ./bootstrap.sh
    if test $? -ne 0; then
    exit 1
    fi
    ./b2 install --prefix=$THIRD_PARTY_HOME/boost
    printf "n33[0;32;34m./b2 install return $?33[mn"
    cd -
    #
    # 安装openssl
    #
    printf "n33[0;32;34minstalling openssl33[mn"
    tar xzf $openssl.tar.gz
    cd $openssl
    ./config --prefix=$THIRD_PARTY_HOME/openssl shared threads
    if test $? -ne 0; then
    exit 1
    fi
    make
    if test $? -ne 0; then
    exit 1
    fi
    make install
    cd -
    #
    # 安装libevent
    #
    printf "n33[0;32;34minstalling libevent33[mn"
    tar xzf $libevent.tar.gz
    cd $libevent
    ./configure --prefix=$THIRD_PARTY_HOME/libevent
    if test $? -ne 0; then
    exit 1
    fi
    make
    if test $? -ne 0; then
    exit 1
    fi
    make install
    cd -
    #
    # 安装thrift
    #
    printf "n33[0;32;34minstalling thrift33[mn"
    tar xzf $thrift.tar.gz
    cd $thrift
    # 按照常规的configure,使用--with-openssl,会遇到
    # “Error: libcrypto required.”错误,这里使用CPPFLAGS和LDFLAGS替代
    ./configure --prefix=$THIRD_PARTY_HOME/thrift
               --with-boost=$THIRD_PARTY_HOME/boost
               --with-libevent=$THIRD_PARTY_HOME/libevent
               CPPFLAGS="-I$THIRD_PARTY_HOME/openssl/include"
               LDFLAGS="-ldl -L$THIRD_PARTY_HOME/openssl/lib"
               --with-qt4=no --with-c_glib=no --with-csharp=no
               --with-java=no --with-erlang=no --with-python=no
               --with-perl=no --with-ruby=no --with-haskell=no
               --with-go=no --with-d=no
    if test $? -ne 0; then
    exit 1
    fi
    # 完成上述修改后,configure可以成功了,但还需要下面修改,
    # 否则make时会报malloc未声明
    sed -i -e 's!#define HAVE_MALLOC 0!#define HAVE_MALLOC 1!' config.h
    sed -i -e 's!#define HAVE_REALLOC 0!#define HAVE_REALLOC 1!' config.h
    sed -i -e 's!#define malloc rpl_malloc!/*#define malloc rpl_malloc*/!' config.h
    sed -i -e 's!#define realloc rpl_realloc!/*#define realloc rpl_realloc*/!' config.h
    make
    if test $? -ne 0; then
    exit 1
    fi
    make install
    cd -
    # 安装成功提示一下
    printf "n33[0;32;34minstall SUCCESS33[mn"
    

     hive的结果导入到mysql报错 参考 Hiveserver和Hiveserver2的区别

     1、sqoop依赖zookeeper所以必须配置ZOOKEEPER_HOME到环境变量中。

    2、sqoop-1.2.0-CDH3B4依赖hadoop-core-0.20.2-CDH3B4.jar所以你需要下载hadoop-0.20.2-CDH3B4.tar.gz解压缩后将hadoop-0.20.2-CDH3B4/hadoop-core-0.20.2-CDH3B4.jar复制到sqoop-1.2.0-CDH3B4/lib中。

    3、sqoop导入mysql数据运行过程中依赖mysql-connector-java-.jar所以你需要下载mysql-connector-java-.jar并复制到sqoop-1.2.0-CDH3B4/lib中。 

    利用udf函数将Hive统计结果直接插入到MySQL
    http://www.linuxidc.com/Linux/2013-04/82878.htm

    Python脚本将Hive的结果保存到MySQL
    http://pslff.diandian.com/post ... 08648

     

    hive的insert操作小结 分区及导出

    insert 语法格式为:
    
    1. 基本的插入语法:
    insert overwrite table tablename [partition(partcol1=val1,partclo2=val2)] select_statement;
    insert into table tablename [partition(partcol1=val1,partclo2=val2)] select_statement;
    eg:
    insert overwrite table test_insert select * from test_table;
    insert into table test_insert select * from test_table;
    注:
    overwrite重写,into追加。
    
    2. 对多个表进行插入操作:
    from source_table
    insert overwrite table tablename1 [partition (partcol1=val1,partclo2=val2)] select_statement1
    insert overwrite table tablename2 [partition (partcol1=val1,partclo2=val2)] select_statement2
    eg:
    from test_table                     
    insert overwrite table test_insert1 
    select key
    insert overwrite table test_insert2
    select value;
    注:hive不支持用insert语句一条一条的进行插入操作,也不支持update操作。数据是以load的方式加载到建立好的表中,数据一旦导入就不可以修改。
    
    2.通过查询将数据保存到filesystem
    insert overwrite [local] directory 'directory' select_statement;
    eg:
    (1)导入数据到本地目录:
    insert overwrite local directory '/home/hadoop/data' select * from test_insert1;
    产生的文件会覆盖指定目录中的其他文件,即将目录中已经存在的文件进行删除。
    只能用overwrite,into错误!
    (2)导出数据到HDFS中:
    insert overwrite directory '/user/hive/warehouse/table' select value from test_table;
    只能用overwrite,into错误!
    (3)同一个查询结果可以同时插入到多个表或者多个目录中:
    from source_table
    insert overwrite local directory '/home/hadoop/data' select * 
    insert overwrite directory '/user/hive/warehouse/table' select value;
    
    3. 小结:
    (1)insert命令主要用于将hive中的数据导出,导出的目的地可以是hdfs或本地filesysytem,导入什么数据在于书写的select语句。
    (2)overwrite与into:
    insert overwrite/into table 可以搭配;
    insert overwrite directory 可以搭配;
    

     Hive的安装详解 重在思路 Beeline 

    向前看,其实很多人都也只是接触的那些,看谁更有远见,才能抄近路,数据不在大小,关键在于价值

    1.安装
    
    yum install hive相关包
    hive相关包如下:
    hive – base package that provides the complete language and runtime (required)
    hive-metastore – provides scripts for running the metastore as a standalone service (optional)
    hive-server – provides scripts for running the original HiveServer as a standalone service (optional)
    hive-server2 – provides scripts for running the new HiveServer2 as a standalone service (optional)
    
    2.配置MySQL作为hive元数据库
    1)创建数据库
    $ mysql -u root -p
    Enter password:
    mysql> CREATE DATABASE metastore;
    mysql> USE metastore;
    mysql> SOURCE /usr/lib/hive/scripts/metastore/upgrade/mysql/hive-schema-0.10.0.mysql.sql;
    2)创建用户/分配权限
    mysql> CREATE USER ‘hive’@’metastorehost’ IDENTIFIED BY ‘mypassword';
    …
    mysql> REVOKE ALL PRIVILEGES, GRANT OPTION FROM ‘hive’@’metastorehost';
    mysql> GRANT SELECT,INSERT,UPDATE,DELETE,LOCK TABLES,EXECUTE ON metastore.* TO ‘hive’@’metastorehost';
    mysql> FLUSH PRIVILEGES;
    mysql> quit;
    
    3.配置hive-site.xml
    a)基础配置(配置为远程模式)
    
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://192.168.1.52:3306/metastore</value>
      <description>the URL of the MySQL database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>hive</value>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>hive</value>
    </property>
    <property>
      <name>datanucleus.autoCreateSchema</name>
      <value>false</value>
    </property>
    <property>
      <name>datanucleus.fixedDatastore</name>
      <value>true</value>
    </property>
    <property>
      <name>datanucleus.autoStartMechanism</name>
      <value>SchemaTable</value>
    </property>
    <property>
      <name>hive.metastore.uris</name>
      <value>thrift://192.168.1.57:9083</value>
      <description>IP address (or fully-qualified domain name) and port of the metastore host</description>
    </property>
    其中hive.metastore.uris配置表明使用第三种方式(远程模式)使用hive。
    注意:hive.metastore.local在hive0.10后不必须配置,如果配置了上面的参数。
    
    4.配置使用hiveserver2
    在hive-site.xml中配置下面选项:
    
     <property>
      <name>hive.support.concurrency</name>
      <description>Enable Hive's Table Lock Manager Service</description>
      <value>true</value>
    </property>
    <property>
      <name>hive.zookeeper.quorum</name>
      <description>Zookeeper quorum used by Hive's Table Lock Manager</description>
      <value>zk1.myco.com,zk2.myco.com,zk3.myco.com</value>
    </property>
    注意:没用配置hive.zookeeper.quorum会导致无法并发执行hive ql请求和导致数据异常
    
    Enabling the Table Lock Manager without specifying a list of valid Zookeeper quorum nodes will result in unpredictable behavior. Make sure that both properties are properly configured.
    5.安装Zookeeper
    由于hiveserver2的表锁管理器需要依赖Zookeeper,因此需要安装Zookeeper启动Zookeeper,详情可以参看文章“Zookeeper安装”
    启动集群的Zookeeper,如果Zookeeper不是默认的端口,则需要显示配置参数:hive.zookeeper.client.port。
    
    6启动服务
    1)启动hive-metastore
    启动metadata服务:
    sudo service hive-metastore start 或者:hive –service metastore
    启动后的端口默认为9083
    2)启动hive-server2
    启动hiveserver2:
    sudo service hive-server2 start
    3)测试
    使用beeline控制台连接hive-server2:
    /usr/bin/beeline
    >!connect jdbc:hive2://localhost:10000 -n hive -p hive org.apache.hive.jdbc.HiveDriver
    执行,show tables等命令查看结果。
    
    附1:beeline参数
    
    Usage: java org.apache.hive.cli.beeline.BeeLine 
       -u                the JDBC URL to connect to
       -n                    the username to connect as
       -p                    the password to connect as
       -d                the driver class to use
       -e                       query that should be executed
       -f                        script file that should be executed
       --color=[true/false]            control whether color is used for display
       --showHeader=[true/false]       show column names in query results
       --headerInterval=ROWS;          the interval between which heades are displayed
       --fastConnect=[true/false]      skip building table/column list for tab-completion
    比较有用的参数:
    –fastConnect=true Building list of tables and columns for tab-completion (set fastconnect to true to skip)…(确实有效)
    –isolation 设置事务的隔离级别
    例子:
    执行sql语句方式:
    beeline -u jdbc:hive2://localhost:10000 -n hdfs -p hdfs -e “show tables”
    执行sql文件方式:
    beeline -u jdbc:hive2://localhost:10000 -n hdfs -p hdfs -f hiveql_test.sql
    
    附录2:hive-server1和hive-server2的区别
    Hiveserver1 和hiveserver2的JDBC区别:
    HiveServer version               Connection URL                    Driver Class
    
    HiveServer2                          jdbc:hive2://:                          org.apache.hive.jdbc.HiveDriver
    HiveServer1                          jdbc:hive://:                            org.apache.hadoop.hive.jdbc.HiveDriver
    

      

  • 相关阅读:
    Codeforces 834D The Bakery
    hdu 1394 Minimum Inversion Number
    Codeforces 837E Vasya's Function
    Codeforces 837D Round Subset
    Codeforces 825E Minimal Labels
    Codeforces 437D The Child and Zoo
    Codeforces 822D My pretty girl Noora
    Codeforces 799D Field expansion
    Codeforces 438D The Child and Sequence
    Codeforces Round #427 (Div. 2) Problem D Palindromic characteristics (Codeforces 835D)
  • 原文地址:https://www.cnblogs.com/kxdblog/p/4782397.html
Copyright © 2011-2022 走看看