需要三步:
1.shell:往 1234 端口写数据
nc localhost 1234
2.shell: 启动flume服务
cd /usr/local2/flume/bin
./flume-ng agent --conf /usr/local2/flume/conf -f /usr/local2/flume/conf/flume-to-spark.conf --name a1
3.IDEA:
import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object DStream_Flume_source { def main(args: Array[String]): Unit = { val host="localhost" val port=4321 val setIntervalTime=Seconds(2) val sparkConf=new SparkConf().setAppName("flume 数据源").setMaster("local[2]") val ssc=new StreamingContext(sparkConf,setIntervalTime) val stream=FlumeUtils.createStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2) stream.count().map(x=>"收到"+x+"个 flume events").print() val words=stream.flatMap(x=>new String(x.event.getBody.array()).split(" ")).map(x=>(x,1)) words.reduceByKey((x,y)=>x+y).print() ssc.start() ssc.awaitTermination() } }
在IDEA中可以看到输入的数据,中文也可以照常显示
/usr/local2/flume/conf/flume-to-spark.conf:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 1234 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 4321 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 1000000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
注意整个启动顺序:IDEA>>>>shell2>>>>shell1 否则报错