package spark import java.util.Properties import java.util.HashMap import org.apache.kafka.clients.producer._ import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.sql.SQLContext import org.apache.spark.{ SparkContext, SparkConf } import spark.bean.orders object SelectFromOneTable { def main(args: Array[String]) { val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_test", "1") val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("producer.type", "async") val producer = new KafkaProducer[String, String](props) val sparkConf = new SparkConf().setAppName("Spark SQL Test Case").setMaster("local") val sparkContext = new SparkContext(sparkConf) val sqlContext = new SQLContext(sparkContext) val url = "jdbc:mysql://localhost:3306/sun_test?user=root&password=Sun@123"; val prop = new Properties(); val df = sqlContext.read.jdbc(url, "flow", prop).collect() for (a <- df) { println(a) val message = new ProducerRecord[String, String](topic, null, a.toString()) producer.send(message) } } }