zoukankan      html  css  js  c++  java
  • IDEA Spark Streaming Flume数据源 --解决无法转化为实际输入数据,及中文乱码(Scala)

    需要三步:

    1.shell:往 1234 端口写数据

    nc localhost 1234

    2.shell: 启动flume服务

    cd /usr/local2/flume/bin

    ./flume-ng agent --conf /usr/local2/flume/conf -f /usr/local2/flume/conf/flume-to-spark.conf  --name a1

    3.IDEA:

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object DStream_Flume_source {
      def main(args: Array[String]): Unit = {
        val host="localhost"
        val port=4321
        val setIntervalTime=Seconds(2)
        val sparkConf=new SparkConf().setAppName("flume 数据源").setMaster("local[2]")
        val ssc=new StreamingContext(sparkConf,setIntervalTime)
        val stream=FlumeUtils.createStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2)
            stream.count().map(x=>"收到"+x+"个 flume events").print()
        val words=stream.flatMap(x=>new String(x.event.getBody.array()).split(" ")).map(x=>(x,1))
             words.reduceByKey((x,y)=>x+y).print()
          ssc.start()
        ssc.awaitTermination()
      }
    
    }

    在IDEA中可以看到输入的数据,中文也可以照常显示

    /usr/local2/flume/conf/flume-to-spark.conf

         a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # Describe/configure the source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 1234
    
            # Describe the sink
            a1.sinks.k1.type = avro
            a1.sinks.k1.hostname = localhost
            a1.sinks.k1.port = 4321
    
            # Use a channel which buffers events in memory
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000000
            a1.channels.c1.transactionCapacity = 1000000
    
            # Bind the source and sink to the channel
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1

     注意整个启动顺序:IDEA>>>>shell2>>>>shell1 否则报错

  • 相关阅读:
    JS中数组去除重复的方法
    ember.js里的实用方法
    Ember入门指南——教程目录
    如何解决问题?
    Web前端开发工程师基本要求
    (转)轻松学习JavaScript三:JavaScript与HTML的结合
    (转)JavaScript二:JavaScript语言的基本语法要求
    HTML的checkbox和radio的美化
    C#串口通信—向串口发送数据,同步接收返回数据
    C#生成验证码
  • 原文地址:https://www.cnblogs.com/soyo/p/7688604.html
Copyright © 2011-2022 走看看