zoukankan      html  css  js  c++  java
  • pyspark 使用时环境设置

    1.在脚本中导入pyspark的流程

    import os 

    import sys

    spark_name = os.environ.get('SPARK_HOME',None)

    # SPARK_HOME即spark的安装目录,不用到bin级别,一般为/usr/local/spark

    if not spark_home:

        raise ValueErrorError('spark 环境没有配置好')

    # sys.path是Python的第三方包查找的路径列表,将需要导入的包的路径添加进入,避免 can't find modal xxxx

    # 这个方法应该同 spark-submit提交时添加参数 --py_files='/path/to/my/python/packages.zip',将依赖包打包成zip 添加进去 效果一致

    sys.path.insert(0,'/root/virtualenvs/my_envs/lib/python3.6/site-packages/')

    sys.path.insert(0,os.path.join(spark_name,'python')

    sys.path.insert(0,os.path.join(spark_name,'python/lib/py4j-0.10.7-src.zip'))

    # sys.path.insert(0,os.path.join(spark_name,'libexec/python'))

    # sys.path.insert(0,os.path.join(spark_name,'libexex/python/build'))

    from pyspark import SparkConf, SparkContext

    #这样就可以成功导入 SparkConf,SparkContext了,而不会出现找不到SparkConf和SparkContext的奇怪问题了。

     ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    2.设置pyspark运行时的python版本

    vi ~/.bashrc

    export PYSPARK_PYTHON=/usr/local/bin/python3 

    export PYSPARK_DRIVER_PYTHON=ipython3

    编辑完保存退出

    source ~/.bashrc

    #在centos系统上,一般来说pyspark运行时默认使用的是系统自带的Python2,而如果要想使用Python3来执行pyspark任务,就需要这样配置

    ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    3.使用pyspark处理hbase缺少jar包时需配置环境

    spark加载配置的默认目录是 SPARK_HOME/conf/spark-env.sh ,不存在此目录此文件时可自行创建

    一般来说在spark-env.sh的末尾需要添加几行

    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)   不添加这一行可能导致java class not found 之类的异常,这个是用来声明spark运行时用到的Java class包的查找目录。

    export JAVA_HOME=/usr/java/jdk1.8.0_191-amd64/jre

    export HADOOP_HOME=/usr/local/hadoop

    export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop            

    export SPARK_MASTER_HOST=HDP-master

    export SPARK_WORKER_CORES=4     设置每个worker最多使用的核数,可设置为机器的内核数

    export SPARK_WORKER_MEMORY=4g    设置每个worker最多使用的内存

    spark处理hbase时需要一些hbase的jar包,可以在SPARK_HOME/jars/下新建一个hbase目录,然后将HBASE_HOME/lib/下面的相关包都复制过来

    (也可单独复制lib目录下的这些包 hbase*.jar ,guava-12.0.1.jar,htrace-core-3.1.0-incubating.jar , protobuf-java-2.5.0.jar )

    另外需下载把hbase的数据转换为Python可读取的jar包 spark-example-1.6.0.jar

    (下载页面地址为https://mvnrepository.com/artifact/org.apache.spark/spark-example_2.11/1.6.0-typesafe-001 )

    这样就需要将spark-env.sh中的SPARK_DIST_CLASSPATH的值修改为

    export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*

     

    4.使用spark读写hbase的相关代码流程

    host = 'master,slave1,slave2'

    hbase_table = 'TEST:test1'

    conf = {"hbase.zookeeper.quorum":host,"hbase.mapreduce.inputtable":hbase_table}

    keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

    valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

    # 读取habse表中的数据到rdd

    hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable",

    "org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)

    count = hbase_rdd.count()

    one = hbase_rdd.first()            查看rdd的第一条数据tuple(rowkey,' '.join(str(json_value)))

    one_value = one[1].split(' ')

    one_value[1]    形式为'{"qualifier":"列名","timestamp":"1560533059864","columnFamily":"列簇名", "row":"0000632232_1550712079","type":"Put","value":"0"}'

           

    写入hbase

    write_table = 'student'

    write_keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"

    write_valueConv= "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    conf = {"hbase.zookeeper.quorum":host,"hbase.mapred.outputtable":table,"mapreduce.outputformat.class":"org.apache.hadoop.hbase.mapreduce.TableOutputFormat",

    "mapreduce.job.output.key.class":"org.apache.hadoop.habse.io.ImmutableBytesWritable","mapreduce.job.output.value.class":"org.apache.hadoop.io.Writable"}

    rawData = ['3,info,age,19','4,info,age,17'] # 最后将数据改成[rowkey,[rowkey,column family, column name,value]]形式写进hbase

    sc.parallelize(rawData).map(lambda x:(x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv) 

      

    spark启动后对应的进程是WORKER 和 MASTER

  • 相关阅读:
    用Docker执行Percona Server
    Java基础 笔记(七)
    VC与JavaScript交互(三) ———— JS调用C++
    4456: [Zjoi2016]旅行者|分治+最短路
    Swift语法学习之 方法
    JavaScript学习笔记二
    Latex 制作积分规则表格
    向MapReduce转换:计算共现关系
    王立平--switch case
    组队训练1 回放
  • 原文地址:https://www.cnblogs.com/Ting-light/p/11303594.html
Copyright © 2011-2022 走看看