(1)在pom.xml中加入如下依赖包
1 <dependency>
2 <groupId>mysql</groupId>
3 <artifactId>mysql-connector-java</artifactId>
4 <version>5.1.38</version>
5 </dependency>
6 <dependency>
7 <groupId>commons-dbcp</groupId>
8 <artifactId>commons-dbcp</artifactId>
9 <version>1.4</version>
10 </dependency>
(2)在MySql中创建数据库和表,命令操作如下
1 mysql -uroot –p
2 create database test;
3 use test;
4 show tables;
5 create table streaming(item varchar(30),count int);
(3)使用Java编写一个数据库连接池类
1 package cn.itcast.spark.day7;
2
3 import java.sql.Connection;
4 import java.sql.DriverManager;
5 import java.util.LinkedList;
6
7 public class ConnectionPool {
8 private static LinkedList<Connection> connectionQueue;
9
10 static {
11 try {
12 Class.forName("com.mysql.jdbc.Driver");
13 }catch (ClassNotFoundException e) {
14 e.printStackTrace();
15 }
16 }
17
18 public synchronized static Connection getConnection() {
19 try {
20 if (connectionQueue == null) {
21 connectionQueue = new LinkedList<Connection>();
22 for (int i = 0;i < 5;i ++) {
23 Connection conn = DriverManager.getConnection(
24 "jdbc:mysql://localhost:3306/test?characterEncoding=utf8&useSSL=true",
25 "root",
26 "root"
27 );
28 connectionQueue.push(conn);
29 }
30 }
31 }catch (Exception e) {
32 e.printStackTrace();
33 }
34 return connectionQueue.poll();
35 }
36
37 public static void returnConnection(Connection conn) {
38 connectionQueue.push(conn);
39 }
40 }
(4)编写Spark代码
1 package cn.itcast.spark.day7
2
3 import org.apache.spark.{SparkConf, TaskContext}
4 import org.apache.spark.streaming.{Seconds, StreamingContext}
5
6 object sqlTest {
7 def main(args: Array[String]){
8
9 val conf = new SparkConf().setMaster("local[2]").setAppName("w")
10 val ssc = new StreamingContext(conf,Seconds(5))
11
12 val lines = ssc.socketTextStream("101.132.122.75",9999)
13 val words = lines.flatMap(_.split(" "))
14 val wordcount = words.map(x => (x,1)).reduceByKey(_+_)
15 wordcount.foreachRDD(rdd => {
16 rdd.foreachPartition(eachPartition => {
17 val conn = ConnectionPool.getConnection();
18 eachPartition.foreach(record => {
19 val sql = "insert into streaming(item,count) values('" + record._1 + "'," + record._2 + ")"
20 val stmt = conn.createStatement
21 stmt.executeUpdate(sql)
22 })
23 ConnectionPool.returnConnection(conn)
24 })
25 })
26 ssc.start()
27 ssc.awaitTermination()
28 }
29 }
(5)打开netcat发送数据
1 root@spark-master:~# nc -lk 9999
2 spark hadoop kafka spark hadoop kafka spark hadoop kafka spark hadoop
(6)提交
/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.35-bin.jar /root/Documents/SparkApps/SparkStreamApps.jar