zoukankan      html  css  js  c++  java
  • Maven+Eclipse+SparkStreaming+Kafka整合

    版本号:

    maven3.5.0     scala IDE for Eclipse:版本(4.6.1)    spark-2.1.1-bin-hadoop2.7    kafka_2.11-0.8.2.1   JDK1.8

    基础环境:

    Maven3.5.0安装与配置+Eclipse应用

    Maven下载项目依赖jar包和使用方法

    maven中把依赖的JAR包一起打包

    MAVEN Scope使用

    一、指定JDK为1.8

    在pom.xml配置文件中添加以下参数即可:

    
    
    1. <properties>
    2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    3. <encoding>UTF-8</encoding>
    4.     <java.version>1.8</java.version>
    5.     <maven.compiler.source>1.8</maven.compiler.source>
    6.     <maven.compiler.target>1.8</maven.compiler.target>
    7. </properties>
    
    
    1. <plugin>  
    2.     <groupId>org.apache.maven.plugins</groupId>  
    3.     <artifactId>maven-compiler-plugin</artifactId>  
    4.     <configuration>  
    5.         <source>1.8</source>  
    6.         <target>1.8</target>  
    7.     </configuration>  
    8. </plugin>

    配置之后的pom.xml文件如下:

    
    
    1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    3. <modelVersion>4.0.0</modelVersion>
    4.  
    5. <groupId>Test</groupId>
    6. <artifactId>test</artifactId>
    7. <version>0.0.1-SNAPSHOT</version>
    8. <packaging>jar</packaging>
    9.  
    10. <name>test</name>
    11. <url>http://maven.apache.org</url>
    12.  
    13. <properties>
    14. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    15. <encoding>UTF-8</encoding>
    16. <!-- 配置JDK为1.8 -->
    17.         <java.version>1.8</java.version>
    18.         <maven.compiler.source>1.8</maven.compiler.source>
    19.         <maven.compiler.target>1.8</maven.compiler.target>
    20. </properties>
    21.  
    22. <dependencies>
    23. <dependency>
    24. <groupId>junit</groupId>
    25. <artifactId>junit</artifactId>
    26. <version>3.8.1</version>
    27. <scope>test</scope>
    28. </dependency>
    29. </dependencies>
    30.  
    31. <build>
    32. <plugins>
    33. <!-- 配置JDK为1.8 -->
    34. <plugin>  
    35.                 <groupId>org.apache.maven.plugins</groupId>  
    36.                 <artifactId>maven-compiler-plugin</artifactId>  
    37.                 <configuration>  
    38.                     <source>1.8</source>  
    39.                     <target>1.8</target>  
    40.                 </configuration>  
    41.             </plugin>  
    42.         
    43.              <!-- 配置打包依赖包maven-assembly-plugin -->
    44. <plugin>
    45. <artifactId> maven-assembly-plugin </artifactId>
    46. <configuration>
    47. <descriptorRefs>
    48. <descriptorRef>jar-with-dependencies</descriptorRef>
    49. </descriptorRefs>
    50. <archive>
    51. <manifest>
    52. <mainClass></mainClass>
    53. </manifest>
    54. </archive>
    55. </configuration>
    56. <executions>
    57. <execution>
    58. <id>make-assembly</id>
    59. <phase>package</phase>
    60. <goals>
    61. <goal>assembly</goal>
    62. </goals>
    63. </execution>
    64. </executions>
    65. </plugin>
    66. </plugins>
    67. </build>
    68. </project>

    二、配置Spark依赖包

    查看spark-2.1.1-bin-hadoop2.7/jars目录下的jar包版本

    到maven远程仓库http://mvnrepository.com中搜索对应jar包即可。

    1、配置spark-core_2.11-2.1.1.jar

    往pom.xml文件中添加以下配置:

    
    
    1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    2. <dependency>
    3.     <groupId>org.apache.spark</groupId>
    4.     <artifactId>spark-core_2.11</artifactId>
    5.     <version>2.1.1</version>
    6.     <scope>runtime</scope>
    7. </dependency>

    为了后面打包时把依赖包也一起打包,需要把<scope>provided</scope>配置成<scope>runtime</scope>

    2、配置spark-streaming_2.11-2.1.1.jar

    往pom.xml文件中添加以下配置:

    
    
    1.  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
    2. <dependency>
    3.     <groupId>org.apache.spark</groupId>
    4.     <artifactId>spark-streaming_2.11</artifactId>
    5.     <version>2.1.1</version>
    6.     <scope>runtime</scope>
    7. </dependency>

     为了后面打包时把依赖包也一起打包,需要把<scope>provided</scope>配置成<scope>runtime</scope>

    三、配置Spark+Kafka

    1、配置spark-streaming-kafka-0-8_2.11-2.1.1.jar

    往pom.xml文件中添加以下配置:

    
    
    1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
    2. <dependency>
    3.     <groupId>org.apache.spark</groupId>
    4.     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    5.     <version>2.1.1</version>
    6. </dependency>

    四、pom.xml完整配置内容

    
    
    1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    3. <modelVersion>4.0.0</modelVersion>
    4.  
    5. <groupId>Test</groupId>
    6. <artifactId>test</artifactId>
    7. <version>0.0.1-SNAPSHOT</version>
    8. <packaging>jar</packaging>
    9.  
    10. <name>test</name>
    11. <url>http://maven.apache.org</url>
    12.  
    13. <properties>
    14. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    15. <encoding>UTF-8</encoding>
    16. <!-- 配置JDK为1.8 -->
    17.         <java.version>1.8</java.version>
    18.         <maven.compiler.source>1.8</maven.compiler.source>
    19.         <maven.compiler.target>1.8</maven.compiler.target>
    20. </properties>
    21.  
    22. <dependencies>
    23. <dependency>
    24. <groupId>junit</groupId>
    25. <artifactId>junit</artifactId>
    26. <version>3.8.1</version>
    27. <scope>test</scope>
    28. </dependency>
    29. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    30. <dependency>
    31.             <groupId>org.apache.spark</groupId>
    32.             <artifactId>spark-core_2.11</artifactId>
    33.             <version>2.1.1</version>
    34.             <scope>runtime</scope>
    35. </dependency>
    36.  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
    37. <dependency>
    38.             <groupId>org.apache.spark</groupId>
    39.             <artifactId>spark-streaming_2.11</artifactId>
    40.             <version>2.1.1</version>
    41.             <scope>runtime</scope>
    42. </dependency>
    43. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
    44. <dependency>
    45.          <groupId>org.apache.spark</groupId>
    46.          <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    47.          <version>2.1.1</version>
    48. </dependency>
    49. </dependencies>
    50.  
    51. <build>
    52. <plugins>
    53. <!-- 配置JDK为1.8 -->
    54. <plugin>  
    55.                 <groupId>org.apache.maven.plugins</groupId>  
    56.                 <artifactId>maven-compiler-plugin</artifactId>  
    57.                 <configuration>  
    58.                     <source>1.8</source>  
    59.                     <target>1.8</target>  
    60.                 </configuration>  
    61.             </plugin>  
    62.         
    63.              <!-- 配置打包依赖包maven-assembly-plugin -->
    64. <plugin>
    65. <artifactId> maven-assembly-plugin </artifactId>
    66. <configuration>
    67. <descriptorRefs>
    68. <descriptorRef>jar-with-dependencies</descriptorRef>
    69. </descriptorRefs>
    70. <archive>
    71. <manifest>
    72. <mainClass></mainClass>
    73. </manifest>
    74. </archive>
    75. </configuration>
    76. <executions>
    77. <execution>
    78. <id>make-assembly</id>
    79. <phase>package</phase>
    80. <goals>
    81. <goal>assembly</goal>
    82. </goals>
    83. </execution>
    84. </executions>
    85. </plugin>
    86. </plugins>
    87. </build>
    88. </project>

    五、本地开发spark代码上传spark集群服务并运行

    JavaDirectKafkaCompare.java

    
    
    1. package com.spark.main;
    2.  
    3. import java.util.HashMap;
    4. import java.util.HashSet;
    5. import java.util.Arrays;
    6. import java.util.Iterator;
    7. import java.util.Map;
    8. import java.util.Set;
    9. import java.util.regex.Pattern;
    10.  
    11. import scala.Tuple2;
    12. import kafka.serializer.StringDecoder;
    13.  
    14. import org.apache.spark.SparkConf;
    15. import org.apache.spark.api.java.function.*;
    16. import org.apache.spark.streaming.api.java.*;
    17. import org.apache.spark.streaming.kafka.KafkaUtils;
    18. import org.apache.spark.streaming.Durations;
    19.  
    20. public class JavaDirectKafkaCompare {
    21. public static void main(String[] args) throws Exception {
    22. /**
    23.  * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
    24.  *  Durations.seconds(2)每两秒读取一次kafka
    25.  */
    26.     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
    27.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
    28.     /**
    29.      * checkpoint("hdfs://192.168.168.200:9000/checkpoint")防止数据丢包
    30.      */
    31.     jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint");
    32.     /**
    33.      * 配置连接kafka的相关参数      
    34.      */
    35.     Set<String> topicsSet = new HashSet<>(Arrays.asList("test"));
    36.     Map<String, String> kafkaParams = new HashMap<>();
    37.     kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
    38.  
    39.     // Create direct kafka stream with brokers and topics
    40.     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
    41.         jssc,
    42.         String.class,
    43.         String.class,
    44.         StringDecoder.class,
    45.         StringDecoder.class,
    46.         kafkaParams,
    47.         topicsSet
    48.     );
    49.  
    50.     // Get the lines, split them into words, count the words and print
    51.     /**
    52.      * _2()获取第二个对象的值
    53.      */
    54.     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
    55.       @Override
    56.       public String call(Tuple2<String, String> tuple2) {
    57.         return tuple2._2();
    58.       }
    59.     });
    60.     
    61.    String sfzh = "432922196105276721";
    62.    JavaDStream<String> wordCounts = lines.filter(new Function<String, Boolean>(){
    63. @Override
    64. public Boolean call(String s) throws Exception {
    65. // TODO Auto-generated method stub
    66. /**
    67.  * 通过身份证号筛选出相关数据
    68.  */
    69. if(s.contains(sfzh)){
    70. System.out.println("比对出来的结果:" + s);
    71. return true;
    72. }
    73. return false;
    74. }
    75.    });
    76.    wordCounts.print();
    77.     // Start the computation
    78.     jssc.start();
    79.     jssc.awaitTermination();
    80. }
    81.  
    82. }

    右键Run As ------>Maven install,运行成功之后,会在target目录生成一个test-0.0.1-SNAPSHOT-jar-with-dependencies.jar,把该jar包复制到LInux集群环境下的SPARK_HOME/myApp目录下:

    执行命令:

    
    
    1. cd /usr/local/spark/spark-2.1.1-bin-hadoop2.7;
    2. bin/spark-submit --class "com.spark.main.JavaDirectKafkaCompare" --master local[4] myApp/test-0.0.1-SNAPSHOT-jar-with-dependencies.jar;

    六、附上离线Maven仓库

     下载地址:  链接:http://pan.baidu.com/s/1eS7Ywme 密码:y3qz

  • 相关阅读:
    Centos7配置局域网yum源报错——Error downloading packages: failed to retrieve packages...
    Centos7扩展根分区——不增加磁盘
    Paxos算法
    记一次业务中的大坑-MYSQL有重复数据下的增加主键
    填坑之路——Hadoop分布式缓存
    Java接口技术
    Java-IO操作性能对比
    Clob对象转换为String
    剖析Reflection.getCallerClass
    Java类的加载、链接和初始化
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723908.html
Copyright © 2011-2022 走看看