zoukankan      html  css  js  c++  java
  • 使用SparkStreaming实现将数据写到MySQL中

    (1)在pom.xml中加入如下依赖包

     1 <dependency>
     2     <groupId>mysql</groupId>
     3     <artifactId>mysql-connector-java</artifactId>
     4     <version>5.1.38</version>
     5 </dependency>
     6 <dependency>
     7     <groupId>commons-dbcp</groupId>
     8     <artifactId>commons-dbcp</artifactId>
     9     <version>1.4</version>
    10 </dependency>
     

    (2)在MySql中创建数据库和表,命令操作如下

    1 mysql -uroot –p
    2 create database test;
    3 use test;
    4 show tables;
    5 create table streaming(item varchar(30),count int);

    (3)使用Java编写一个数据库连接池类

     1 package cn.itcast.spark.day7;
     2 
     3 import java.sql.Connection;
     4 import java.sql.DriverManager;
     5 import java.util.LinkedList;
     6 
     7 public class ConnectionPool {
     8     private static LinkedList<Connection> connectionQueue;
     9 
    10     static {
    11         try {
    12             Class.forName("com.mysql.jdbc.Driver");
    13         }catch (ClassNotFoundException e) {
    14             e.printStackTrace();
    15         }
    16     }
    17 
    18     public synchronized static Connection getConnection() {
    19         try {
    20             if (connectionQueue == null) {
    21                 connectionQueue = new LinkedList<Connection>();
    22                 for (int i = 0;i < 5;i ++) {
    23                     Connection conn = DriverManager.getConnection(
    24                             "jdbc:mysql://localhost:3306/test?characterEncoding=utf8&useSSL=true",
    25                             "root",
    26                             "root"
    27                     );
    28                     connectionQueue.push(conn);
    29                 }
    30             }
    31         }catch (Exception e) {
    32             e.printStackTrace();
    33         }
    34         return connectionQueue.poll();
    35     }
    36 
    37     public static void returnConnection(Connection conn) {
    38         connectionQueue.push(conn);
    39     }
    40 }

    (4)编写Spark代码
     1 package cn.itcast.spark.day7
     2 
     3 import org.apache.spark.{SparkConf, TaskContext}
     4 import org.apache.spark.streaming.{Seconds, StreamingContext}
     5 
     6 object sqlTest {
     7   def main(args: Array[String]){
     8 
     9     val conf = new SparkConf().setMaster("local[2]").setAppName("w")
    10     val ssc = new StreamingContext(conf,Seconds(5))
    11 
    12     val lines = ssc.socketTextStream("101.132.122.75",9999)
    13     val words = lines.flatMap(_.split(" "))
    14     val wordcount = words.map(x => (x,1)).reduceByKey(_+_)
    15     wordcount.foreachRDD(rdd => {
    16       rdd.foreachPartition(eachPartition => {
    17         val conn = ConnectionPool.getConnection();
    18         eachPartition.foreach(record => {
    19           val sql = "insert into streaming(item,count) values('" + record._1 + "'," + record._2 + ")"
    20           val stmt = conn.createStatement
    21           stmt.executeUpdate(sql)
    22         })
    23         ConnectionPool.returnConnection(conn)
    24       })
    25     })
    26     ssc.start()
    27     ssc.awaitTermination()
    28   }
    29 }
    
    

    (5)打开netcat发送数据

    1 root@spark-master:~# nc -lk 9999 
    2 spark hadoop kafka spark hadoop kafka spark hadoop kafka spark hadoop

    (6)提交

    /usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.35-bin.jar /root/Documents/SparkApps/SparkStreamApps.jar
  • 相关阅读:
    Digital Video Stabilization and Rolling Shutter Correction using Gyroscope 论文笔记
    Distortion-Free Wide-Angle Portraits on Camera Phones 论文笔记
    Panorama Stitching on Mobile
    Natural Image Stitching with the Global Similarity Prior 论文笔记 (三)
    Natural Image Stitching with the Global Similarity Prior 论文笔记(二)
    Natural Image Stitching with the Global Similarity Prior 论文笔记(一)
    ADCensus Stereo Matching 笔记
    Efficient Large-Scale Stereo Matching论文解析
    Setting up caffe on Ubuntu
    Kubernetes配置Secret访问Harbor私有镜像仓库
  • 原文地址:https://www.cnblogs.com/hmy-blog/p/7798840.html
Copyright © 2011-2022 走看看