zoukankan      html  css  js  c++  java
  • Idea-spark消费kafka数据写入es

    1.maven配置

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>SparkToES</groupId>
      <artifactId>SparkToES</artifactId>
      <version>1.0-SNAPSHOT</version>
      <inceptionYear>2008</inceptionYear>
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <java.version>1.8</java.version>
        <spark.version>2.2.3</spark.version>
        <scala.version>2.11.8</scala.version>
        <elasticsearch.version>6.1.2</elasticsearch.version>
      </properties>
    
      <repositories>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </repository>
      </repositories>
    
      <pluginRepositories>
        <pluginRepository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
      </pluginRepositories>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.elasticsearch</groupId>
          <artifactId>elasticsearch-spark-20_2.11</artifactId>
          <version>${elasticsearch.version}</version>
              <scope>provided</scope>
        </dependency>
        <!-- spark end -->
    
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.56</version>
        </dependency>
        <dependency>
          <groupId>org.specs</groupId>
          <artifactId>specs</artifactId>
          <version>1.2.5</version>
          <scope>test</scope>
        </dependency>
      </dependencies>
    
      <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
          <!--将所有依赖打成一个jar包-->
          <plugin >
            <artifactId >maven-assembly-plugin</artifactId >
            <configuration >
              <descriptorRefs >
                <descriptorRef >jar-with-dependencies </descriptorRef >
              </descriptorRefs >
              <archive >
                <manifest >
                  <mainClass ></mainClass >
                </manifest >
              </archive >
            </configuration >
            <executions >
              <execution >
                <id >make-assembly </id >
                <phase >package </phase >
                <goals >
                  <goal >single </goal >
                </goals >
              </execution >
            </executions >
          </plugin >
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
              <args>
                <arg>-target:jvm-1.5</arg>
              </args>
            </configuration>
          </plugin>
    
        </plugins>
      </build>
      <reporting>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
          </plugin>
        </plugins>
      </reporting>
    
    </project>

    2.简单的过滤后数据写入es的demo

    package test1
    
    import java.text.SimpleDateFormat
    import java.util.{Calendar, Date}
    
    import com.alibaba.fastjson.JSON
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.elasticsearch.spark.rdd.EsSpark
    
    object WafLogToES {
    
      def main(args: Array[String]): Unit = {
    
        val conf=new SparkConf().setMaster("yarn-client").setAppName("test").set("es.nodes", "ip1").set("es.port", "9200")
    
        //val conf=new SparkConf().setMaster("yarn-client").setAppName("sparktoes").set("es.nodes", "ip1").set("es.port", "9200")
        //val conf=new SparkConf().setMaster("local[3]").setAppName("scalaSpark08").set("es.nodes", "ip1").set("es.port", "9200")
    
        val sc=new SparkContext(conf)
        val ssc=new StreamingContext(sc,Seconds(3))
    
        //val topics=scala.Iterable("topic1")
        val topics=scala.Iterable("topic1")
    
       
        val kafkaParams=scala.collection.mutable.Map("bootstrap.servers"->"ip3,ip2,ip4",
          "group.id"->"kafkaConsumeGroup-01",
          "key.deserializer"->classOf[StringDeserializer])
        kafkaParams +=("value.deserializer"->classOf[StringDeserializer])
        kafkaParams +=("auto.offset.reset"->"latest")
        kafkaParams +=("enable.auto.commit"->true)
        kafkaParams +=("max.poll.records"->500)
    
        val data: InputDStream[ConsumerRecord[String, String]]=
          KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))
    
    
    
    
    
        data.foreachRDD{rdd=>
          val now=new Date().getTime
          val dateFormat=new SimpleDateFormat("yyyyMM")
          val cal:Calendar=Calendar.getInstance()
          cal.add(Calendar.DATE,-1)
          val date=dateFormat.format(cal.getTime)
          //val index="index_"+date+"/doc"
          //索引
          val index="index_"+date+"/doc"
          val json=rdd.map{  consumer =>
            val v1: String=consumer.value
            val obj:com.alibaba.fastjson.JSONObject=JSON.parseObject(v1)
            val logStr : String =obj.getString("message")
            val log:Array[String]=logStr.replace("<14>", "").split(";\]")
            //过滤value为空的key
            val result=log.filter{x=> x.lastIndexOf("=") != -1 && (x.indexOf("=")+1 !=x.length) }
              .map{x=>
                val resultTemp = x.split("=", 2)
                if (resultTemp.apply(0)=="CTime") {
                  resultTemp(1)=resultTemp.apply(1).replace("T"," ")
                }
                if (resultTemp.apply(0)=="LastUTime"){
                  resultTemp(1)=resultTemp.apply(1).replace("T"," ")
                }
                if (resultTemp.apply(1) == "[]"){
                  (resultTemp.apply(0).toLowerCase(), "")
                } else {
                  (resultTemp.apply(0).toLowerCase, resultTemp.apply(1))
                }
              }.toMap
              result
          }.filter(x=>x.contains("a") && x.contains("b") && x.contains("c") && x.contains("d")&&x.contains("e") )
            .filter(x=>x.apply("f").matches("""d*-d*-d* d*:d*:d*""") && x.apply("f")!="")
            .filter(x=>x.apply("g").length>0 && x.apply("h").length>0 && x.apply("i").length>0 && x.apply("j").length>0)
            .map{x=>
              import org.json4s.JsonDSL._
              import org.json4s.NoTypeHints
              import org.json4s.jackson.JsonMethods._
              import org.json4s.jackson.Serialization
              implicit val formats = Serialization.formats(NoTypeHints)
              val resultjson = compact(render(x))
    
              resultjson
            }
          EsSpark.saveJsonToEs(json, index)
        }
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    3.spark执行命令

    export HADOOP_USER_NAME=队列1
    export JAVA_HOME=/application/jdk1.8.0_111
    export SPARK_HOME=/application/spark-2.2.1
    export SPARK_CONF_DIR=/application/spark-2.2.1/conf
    
    spark-submit 
    --master yarn 
    --deploy-mode client 
    --driver-memory 2G 
    --executor-memory 4G 
    --executor-cores 2 
    --num-executors 6 
    --queue queue_8018_01 
    --conf spark.serialize="org.apache.spark.serializer.KryoSerializer" 
    --conf spark.yarn.executor.memoryOverhead=3096 
    --conf spark.yarn.am.memory=2048 
    --conf "spark.executorEnv.JAVA_HOME=/application/jdk1.8.0_111" 
    --conf "spark.yarn.appMasterEnv.JAVA_HOME=/application/jdk1.8.0_111" 
    --conf "spark.yarn.appMasterEnv.SPARK_HOME"="/application/spark-2.2.1" 
    --conf "spark.yarn.appMasterEnv.SPARK_CONF_DIR"="/application/spark-2.2.1-config"  
    --conf "spark.executorEnv.SPARK_HOME"="/application/spark-2.2.1"
    --conf "spark.executorEnv.SPARK_CONF_DIR"="/application/spark-2.2.1-config"
    --conf "spark.executorEnv.SCALA_HOME=/application/scala-2.11"
    --conf "spark.yarn.appMasterEnv.SCALA_HOME=/application/scala-2.11"
    --conf spark.executor.extraJavaOptions="-XX:InitiatingHeapOccupancyPercent=30 -XX:G1HeapRegionSize=16m"
    --class com.test.SparkToES
    --jars /application/spark_es/jars/spark-streaming-kafka-0-10_2.11-2.2.1.jar,
    /application/spark_es/jars/elasticsearch-spark-20_2.11-6.0.1.jar,
    /application/spark_es/jars/fastjson-1.2.56.jar,/application/jars/kafka-clients-2.0.1.jar
    /application/waf_spark_es_001/ARS.jar 

      

  • 相关阅读:
    智慧养老民政监管平台建设方案
    CF600E Lomsat gelral dsu on tree
    dsu on tree详解
    【Spring 从0开始】Spring5 新功能,整合日志框架 Log4j2
    【Spring 从0开始】JdbcTemplate 数据库事务管理
    【Spring 从0开始】JdbcTemplate 数据库事务参数
    【Spring 从0开始】JdbcTemplate 数据库事务管理
    【Spring 从0开始】JdbcTemplate 操作数据库
    【Spring 从0开始】AOP 操作
    【Spring 从0开始】AOP 操作中的相关术语、环境准备
  • 原文地址:https://www.cnblogs.com/xinyumuhe/p/12186015.html
Copyright © 2011-2022 走看看