需求:用Spark给Kafka写数据,模拟线上环境。
需要先创建topic
bin/kafka-topics.sh --zookeeper master:2181 --list
bin/kafka-topics.sh --zookeeper master:2181 --create --topic AV_LOAD_XXX --replication-factor 1 --partitions 2
后台启动Zookeeper
nohup sudo ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
后台启动Kafka
nohup sudo ./bin/kafka-server-start.sh config/server.properties &
Spark往Kafka写数据
package tool /** * Created by Administrator on 2017/3/4. */ import java.util.Properties import config.{ConfigUtil, ExternalConfig} import kafka.javaapi.producer.Producer import kafka.producer.{KeyedMessage, ProducerConfig} import scala.io.Source object KafkaProducer { def main(args: Array[String]): Unit = { val brokers = ExternalConfig.BROKERS_LIST val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val kafkaConfig = new ProducerConfig(props) val producer = new Producer[String, String](kafkaConfig) val code = ConfigUtil.getStringProperty("charCode") while(true){ for (line <- Source.fromFile("./data.txt", code).getLines){ producer.send(new KeyedMessage[String, String]("AV_LOAD_XXX", line)) Thread.sleep(1000) } } } }
nohup spark-submit --master spark://192.168.10.10:7077 --class tool.KafkaProducer --executor-memory 4G --total-executor-cores 4 StreamClassify-producer-1.0.jar &
查看Kafka输出
bin/kafka-console-consumer.sh --zookeeper master:2181 --topic AV_LOAD_XXX