zoukankan      html  css  js  c++  java
  • 使用SparkStreaming实现将数据写到MySQL中

    (1)在pom.xml中加入如下依赖包

     1 <dependency>
     2     <groupId>mysql</groupId>
     3     <artifactId>mysql-connector-java</artifactId>
     4     <version>5.1.38</version>
     5 </dependency>
     6 <dependency>
     7     <groupId>commons-dbcp</groupId>
     8     <artifactId>commons-dbcp</artifactId>
     9     <version>1.4</version>
    10 </dependency>
     

    (2)在MySql中创建数据库和表,命令操作如下

    1 mysql -uroot –p
    2 create database test;
    3 use test;
    4 show tables;
    5 create table streaming(item varchar(30),count int);

    (3)使用Java编写一个数据库连接池类

     1 package cn.itcast.spark.day7;
     2 
     3 import java.sql.Connection;
     4 import java.sql.DriverManager;
     5 import java.util.LinkedList;
     6 
     7 public class ConnectionPool {
     8     private static LinkedList<Connection> connectionQueue;
     9 
    10     static {
    11         try {
    12             Class.forName("com.mysql.jdbc.Driver");
    13         }catch (ClassNotFoundException e) {
    14             e.printStackTrace();
    15         }
    16     }
    17 
    18     public synchronized static Connection getConnection() {
    19         try {
    20             if (connectionQueue == null) {
    21                 connectionQueue = new LinkedList<Connection>();
    22                 for (int i = 0;i < 5;i ++) {
    23                     Connection conn = DriverManager.getConnection(
    24                             "jdbc:mysql://localhost:3306/test?characterEncoding=utf8&useSSL=true",
    25                             "root",
    26                             "root"
    27                     );
    28                     connectionQueue.push(conn);
    29                 }
    30             }
    31         }catch (Exception e) {
    32             e.printStackTrace();
    33         }
    34         return connectionQueue.poll();
    35     }
    36 
    37     public static void returnConnection(Connection conn) {
    38         connectionQueue.push(conn);
    39     }
    40 }

    (4)编写Spark代码
     1 package cn.itcast.spark.day7
     2 
     3 import org.apache.spark.{SparkConf, TaskContext}
     4 import org.apache.spark.streaming.{Seconds, StreamingContext}
     5 
     6 object sqlTest {
     7   def main(args: Array[String]){
     8 
     9     val conf = new SparkConf().setMaster("local[2]").setAppName("w")
    10     val ssc = new StreamingContext(conf,Seconds(5))
    11 
    12     val lines = ssc.socketTextStream("101.132.122.75",9999)
    13     val words = lines.flatMap(_.split(" "))
    14     val wordcount = words.map(x => (x,1)).reduceByKey(_+_)
    15     wordcount.foreachRDD(rdd => {
    16       rdd.foreachPartition(eachPartition => {
    17         val conn = ConnectionPool.getConnection();
    18         eachPartition.foreach(record => {
    19           val sql = "insert into streaming(item,count) values('" + record._1 + "'," + record._2 + ")"
    20           val stmt = conn.createStatement
    21           stmt.executeUpdate(sql)
    22         })
    23         ConnectionPool.returnConnection(conn)
    24       })
    25     })
    26     ssc.start()
    27     ssc.awaitTermination()
    28   }
    29 }
    
    

    (5)打开netcat发送数据

    1 root@spark-master:~# nc -lk 9999 
    2 spark hadoop kafka spark hadoop kafka spark hadoop kafka spark hadoop

    (6)提交

    /usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.35-bin.jar /root/Documents/SparkApps/SparkStreamApps.jar
  • 相关阅读:
    Windows10下VirtualBox安装CentOS7网络配置:添加DNS
    Windows下LaTeX环境: SumatraPDF + notepad++/ST3
    Windows10 环境变量设置: "此环境变量太大"
    修改CMD字体后导致乱码的恢复方法
    nfs:server is not responding,still trying 原因与解决方案
    [GIT] 更新.repo目录
    linux网络栈相关
    linux mailbox模型
    虚拟机ping不通主机,主机能ping 通虚拟机问题解决
    pygame学习
  • 原文地址:https://www.cnblogs.com/hmy-blog/p/7798840.html
Copyright © 2011-2022 走看看