zoukankan      html  css  js  c++  java
  • 2月7日学习笔记

    1,背诵单词:Jew:犹太人  sandy:沙的  bark:厉声说话  tyre:轮胎  suck:吸入  tray:盘子  trunk:树干  terminal:终结  trend:趋向  twist:绕  statistical:统计学的  tank:坦克  sequence:顺序  thermometer:温度计  utilize:利用  recorder:录音机  thermometer:温度计  utilize:利用,使用  steam:蒸汽  steel:钢制的

    2,学习spark视频https://www.bilibili.com/video/av84188605

      配置yarn模式

    1)修改hadoop配置文件yarn-site.xml,添加如下内容:

     <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
            <property>
                    <name>yarn.nodemanager.pmem-check-enabled</name>
                    <value>false</value>
            </property>
            <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
            <property>
                    <name>yarn.nodemanager.vmem-check-enabled</name>
                    <value>false</value>
            </property>

    2)修改spark-env.sh,添加如下配置:YARN_CONF_DIR=/opt/module/hadoop-2.6.4/etc/hadoop

    3)修改配置文件spark-defaults.conf:

    spark.yarn.historyServer.address=hadoop3:18080
    spark.history.ui.port=18080

          使用IDEA编写spark程序:

    1)导入的依赖:

    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>firstspark</finalName>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>

    2)打包插件:

    <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.0.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>WordCount</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
          </plugin>

            RDD读取和写入mysql数据:

    1)添加依赖

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.27</version>
    </dependency>

    2Mysql读取:

    package com.atguigu
    
    import java.sql.DriverManager
    
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object MysqlRDD {
    
     def main(args: Array[String]): Unit = {
    
       //1.创建spark配置信息
       val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
    
       //2.创建SparkContext
       val sc = new SparkContext(sparkConf)
    
       //3.定义连接mysql的参数
       val driver = "com.mysql.jdbc.Driver"
       val url = "jdbc:mysql://hadoop102:3306/rdd"
       val userName = "root"
       val passWd = "000000"
    
       //创建JdbcRDD
       val rdd = new JdbcRDD(sc, () => {
         Class.forName(driver)
         DriverManager.getConnection(url, userName, passWd)
       },
         "select * from `rddtable` where `id`>=?;",
         1,
         10,
         1,
         r => (r.getInt(1), r.getString(2))
       )
    
       //打印最后结果
       println(rdd.count())
       rdd.foreach(println)
    
       sc.stop()
     }
    }

    (3)MySQL写入:

    def main(args: Array[String]) {
      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
      val sc = new SparkContext(sparkConf)
      val data = sc.parallelize(List("Female", "Male","Female"))
    
      data.foreachPartition(insertData)
    }
    
    def insertData(iterator: Iterator[String]): Unit = {
    Class.forName ("com.mysql.jdbc.Driver").newInstance()
      val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000")
      iterator.foreach(data => {
        val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
        ps.setString(1, data) 
        ps.executeUpdate()
      })
    }

         RDD读取写入HBASE数据:

    1)添加依赖

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.3.1</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.3.1</version>
    </dependency>

    2)从HBase读取数据

    package com.atguigu
    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.hadoop.hbase.util.Bytes
    
    object HBaseSpark {
    
      def main(args: Array[String]): Unit = {
    
        //创建spark配置信息
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
    
        //创建SparkContext
        val sc = new SparkContext(sparkConf)
    
        //构建HBase配置信息
        val conf: Configuration = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
        conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
    
        //从HBase读取数据形成RDD
        val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
          conf,
          classOf[TableInputFormat],
          classOf[ImmutableBytesWritable],
          classOf[Result])
    
        val count: Long = hbaseRDD.count()
        println(count)
    
        //对hbaseRDD进行处理
        hbaseRDD.foreach {
          case (_, result) =>
            val key: String = Bytes.toString(result.getRow)
            val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
            val color: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("color")))
            println("RowKey:" + key + ",Name:" + name + ",Color:" + color)
        }
    
        //关闭连接
        sc.stop()
      }
    
    }

    3)往HBase写入

    def main(args: Array[String]) {
    //获取Spark配置信息并创建与spark的连接
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp")
      val sc = new SparkContext(sparkConf)
    
    //创建HBaseConf
      val conf = HBaseConfiguration.create()
      val jobConf = new JobConf(conf)
      jobConf.setOutputFormat(classOf[TableOutputFormat])
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
    
    //构建Hbase表描述器
      val fruitTable = TableName.valueOf("fruit_spark")
      val tableDescr = new HTableDescriptor(fruitTable)
      tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
    
    //创建Hbase表
      val admin = new HBaseAdmin(conf)
      if (admin.tableExists(fruitTable)) {
        admin.disableTable(fruitTable)
        admin.deleteTable(fruitTable)
      }
      admin.createTable(tableDescr)
    
    //定义往Hbase插入数据的方法
      def convert(triple: (Int, String, Int)) = {
        val put = new Put(Bytes.toBytes(triple._1))
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
        (new ImmutableBytesWritable, put)
      }
    
    //创建一个RDD
      val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
    
    //将RDD内容写到HBase
      val localData = initialRDD.map(convert)
    
      localData.saveAsHadoopDataset(jobConf)
    }

    3,遇到的问题:遇到IDEA在Centos下不能输入中文,只能输入英文

    4,明天计划:学习spark

  • 相关阅读:
    【python-opencv】opencv基础操作之一
    【胎教】做AI的基础,开始学习。
    【实习】博士生找实习的囧事之其一
    【经验】CS
    【keras】用tensorboard监视CNN每一层的输出
    【算法】背包九讲
    【计算机网络】大数据 云计算 人工智能
    【算法】shortest distance
    【git】git hello world
    【算法】深度优先 马走日 Hamilton routes
  • 原文地址:https://www.cnblogs.com/lq13035130506/p/12275059.html
Copyright © 2011-2022 走看看