zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记十二之铭文升级版

    铭文一级:

    ======
    Pull方式整合

    Flume Agent的编写: flume_pull_streaming.conf

    simple-agent.sources = netcat-source
    simple-agent.sinks = spark-sink
    simple-agent.channels = memory-channel

    simple-agent.sources.netcat-source.type = netcat
    simple-agent.sources.netcat-source.bind = hadoop000
    simple-agent.sources.netcat-source.port = 44444

    simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
    simple-agent.sinks.spark-sink.hostname = hadoop000
    simple-agent.sinks.spark-sink.port = 41414

    simple-agent.channels.memory-channel.type = memory

    simple-agent.sources.netcat-source.channels = memory-channel
    simple-agent.sinks.spark-sink.channel = memory-channel

    注意点:先启动flume 后启动Spark Streaming应用程序
    flume-ng agent
    --name simple-agent
    --conf $FLUME_HOME/conf
    --conf-file $FLUME_HOME/conf/flume_pull_streaming.conf
    -Dflume.root.logger=INFO,console


    spark-submit
    --class com.imooc.spark.FlumePullWordCount
    --master local[2]
    --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0
    /home/hadoop/lib/sparktrain-1.0.jar
    hadoop000 41414

    铭文二级:

    Pull方式与Flume对接(常用):

    改flume的配置文件,改sink的名称以及必须属性

    官网Advanced Source大标题下有路径:Flume Integration Guide

    一、导入jar包三个(第二个如果是用maven构建的scala工程则自动有):

     groupId = org.apache.spark
     artifactId = spark-streaming-flume-sink_2.11
     version = 2.2.1
     groupId = org.scala-lang
     artifactId = scala-library
     version = 2.11.8
     groupId = org.apache.commons
     artifactId = commons-lang3
     version = 3.5

    二、修改自定义sink:

     agent.sinks = spark                //自起名字
     agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
     agent.sinks.spark.hostname = <hostname of the local machine>        //hadoop000
     agent.sinks.spark.port = <port to listen on for connection from Spark>  //41414
     agent.sinks.spark.channel = memoryChannel   //自起名字

    三、将createStream=>改成createPollingStream即可

    四、启动顺序:

    启动Flume->启动代码->telnet localhost 44444

    五、打包到服务器

    注释掉.setMaster后面的代码,可先删除sparktrain-1.0.jar,重新打包。

    spark-submit
    --class com.imooc.spark.FlumePullWordCount
    --master local[2]
    --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0
    /home/hadoop/lib/sparktrain-1.0.jar
    hadoop000 41414

    (竟然没有--name??--name是指定UI界面的名称)

    整合Spark Streaming与Kafka实战:

    一、Receiver-based

    二、Direct Approch(常用)

    Receiver方法(会有数据丢失)

    此处选版本:Kafka Integration Guide

    Write Ahead Logs (spark1.2版本引入的)

    先确定修改的配置文件能用再进行编码:

    具体步骤=>

    1.先启动zk:./zkServer.sh start

    2.启动kafka:./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

    3.创建topic

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic

    ./kafka-topics.sh --list --zookeeper localhost:2181

    4.通过控制台测试是否能正常生产与消费

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic

    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic

    编码(与Flume相类似)=>

    1.引入依赖(记得查看maven project是否真的导入了)

    groupId = org.apache.spark
    artifactId = spark-streaming-kafka-0-8_2.11
    version = 2.2.1

    2.建KafkaReceiverWordCount类

    main方法->最基础的四行代码->引入代码:

    val kafkaStream = KafkaUtils.createStream(streamingContext,
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) 

    ->引入数组,含四个数->val Array(zkQuorum,group,topics,numThreads) = args

    ->判断是否传入四个参数->构建topicMap:

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    ->topicMap带入KafkaUtils参数

    ->messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    messages即官网代码的kafkaSteam

    KafkaUtils

    3.打包jar

    4.部署到服务器

    (一般生产上是不能联网的,所以--packages使用不了,只能下载后用--jars来完成)

  • 相关阅读:
    【Python-虫师】自动化测试模型--参数化
    【Loadrunner】【浙江移动项目手写代码】代码备份
    虫师的性能测试思想html网页学习
    Loadrunner之https协议录制回放报错如何解决?(九)
    【Python虫师】多窗口定位
    【虫师讲Selenium+Python】第三讲:操作测试对象
    【虫师Python】第二讲:元素定位
    【小甲鱼】【Python】正则表达式(三)
    【小甲鱼】【Python】正则表达式(二)
    js提交数据时需判断是点击事件还是回车键
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8385409.html
Copyright © 2011-2022 走看看