zoukankan      html  css  js  c++  java
  • 关于spark进行实时日志解析,保存hbase与mysql

    进行地域分析  rowkey=中国_上海_201901016  value=访问次数 
     1 areaStartAmt.foreachRDD(rdd => {
     2       rdd.foreachPartition(partitionOfRecords => {
     3 //        /**
     4 //          * *&**********************************************************************
     5 //          *注意事项1:在各个分区内进行hbase设置,开启连接  每个分区连接一次 避免每条每条数据进行连接
     6 //          * 注意事项2:在外部创建hbase与connect  是在diver端的代码  需要注意在foreachRDD算子进行的操作是在executor的操作 会报序列化错误
     7 //          * 注意事项3:从中可以看出,直接把 int 型的参数传入 Bytes.toBytes() 函数中,编译不会报错,但数据的格式发生错误,再显示时就会出现乱码,
     8 //          * 因此,在调用 Bytes.toBytes() 函数时,需要先将 int, double 型数据转换成 String 类型,此时即可正常显示。
     9 //          *  查询会出现乱码  int double等 需要  put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("accountNum"), Bytes.toBytes(String.valueOf(record._2)))
    10 //          *  注意事项3:使用500条一个批次提交的sql代码执行 局部更新操作 ,数据更新不知是太慢 还是未达到500条 数据库数据不正确
    11 //          *  直接使用了 val sql1 = s"insert into area_user_amt (date,country,provence,amt)
    12 //          *  values('${datekey}','${countrykey}','${provencekey}','${amt}') ON DUPLICATE KEY UPDATE `amt`= '${amt}'"
    13 //          * 未使用预编译 与批次提交 实时更新  在集群模式下所以的分区与机器都访问数据库的次数过多 造成结果??
    14 //          *********************************************************************
    15 //          */
    16 
    17 
    18         val hbaseConf = HBaseConfiguration.create()
    19         //        hbaseConf.set("hbase.rootdir", "hdfs://hadoop01:9000/hbase")
    20         //        hbaseConf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181")
    21         hbaseConf.addResource("hbase-site.xml")
    22         val connection = ConnectionFactory.createConnection(hbaseConf)
    23         // val admin=connection.getAdmin;
    24         val table = connection.getTable(TableName.valueOf("test1"));
    25         if (partitionOfRecords.isEmpty) {
    26           println("This RDD is not null but partition is null")
    27         } else {
    28           partitionOfRecords.foreach(record => {
    29             val put = new Put(Bytes.toBytes(record._1))
    30             /*
    31              从中可以看出,直接把 int 型的参数传入 Bytes.toBytes() 函数中,编译不会报错,但数据的格式发生错误,再显示时就会出现乱码,
    32             因此,在调用 Bytes.toBytes() 函数时,需要先将 int, double 型数据转换成 String 类型,此时即可正常显示。
    33            ***********************************************************************
    34              */
    35 
    36             put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("accountNum"), Bytes.toBytes(String.valueOf(record._2)))
    37             table.put(put)
    38           })
    39         }
    40       })

    // HbaseUtil.scanDataFromHabse(table)
  • 相关阅读:
    《人月神话》读后感-何保委
    软件工程2017第二次作业随笔-何保委
    软件工程2017第一次作业随笔
    实验吧 REVERSE
    浙大ctf REVERSE
    eclipse安装
    表单
    【南京邮电】maze 迷宫解法
    看雪.TSRC 2017CTF秋季赛第三题
    使用Z3破解简单的XOR加密
  • 原文地址:https://www.cnblogs.com/hejunhong/p/10280587.html
Copyright © 2011-2022 走看看