Kafka0.11之producer/consumer(Scala):
KafkaConsumer:
import java.util.Properties import org.apache.kafka.clients.consumer.KafkaConsumer import kafka.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords object KafkaConsumer { def main(args: Array[String]): Unit = { var groupid = "ScalaGroup" var consumerid = "ScalaConsumer" var topic = "ScalaTopic" //args match { // case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3 //} val props = new Properties() props.put("bootstrap.servers", "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092") props.put("group.id", groupid) props.put("client.id", "test") props.put("consumer.id", consumerid) // props.put("auto.offset.reset", "smallest") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "100") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(java.util.Arrays.asList(topic)) while (true) { val records = consumer.poll(100) for (record <- records) { println(s"Topic = ${record.topic()}, partition = ${record.partition()}, key = ${record.key()}, value = ${record.value()}") } } } }
KafkaProducer:
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} object KafkaProducer { def main(args: Array[String]): Unit = { val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092" // val brokers = "192.168.1.151:9092" val topic = "ScalaTopic"; val props = new Properties() props.put("bootstrap.servers", brokers) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("partitioner.class", classOf[HashPartitioner].getName) props.put("producer.type", "sync") props.put("batch.size", "1") props.put("acks", "all") val producer = new KafkaProducer[String, String](props); val sleepFlag = false; val message1 = new ProducerRecord[String, String](topic, "1", "test 1aa"); producer.send(message1); if (sleepFlag) Thread.sleep(5000); val message2 = new ProducerRecord[String, String](topic, "1", "test 1bb"); producer.send(message2); if (sleepFlag) Thread.sleep(5000); val message3 = new ProducerRecord[String, String](topic, "1", "test 1cc"); producer.send(message3); if (sleepFlag) Thread.sleep(5000); val message4 = new ProducerRecord[String, String](topic, "4", "test 4dd"); producer.send(message4); if (sleepFlag) Thread.sleep(5000); val message5 = new ProducerRecord[String, String](topic, "4", "test 4aa"); producer.send(message5); if (sleepFlag) Thread.sleep(5000); val message6 = new ProducerRecord[String, String](topic, "3", "test 3bb"); producer.send(message6); if (sleepFlag) Thread.sleep(5000); val message7 = new ProducerRecord[String, String](topic, "2", "test 2bb"); producer.send(message7); if (sleepFlag) Thread.sleep(5000); producer.close() } }
HashPartitioner:
import java.util import scala.math._ import kafka.utils.VerifiableProperties import org.apache.kafka.clients.producer.Partitioner import org.apache.kafka.common.Cluster class HashPartitioner extends Partitioner { def this(verifiableProperties: VerifiableProperties) { this } override def partition(topic: String, key: scala.Any, keyBytes: Array[Byte], value: scala.Any, valueBytes: Array[Byte], cluster: Cluster) = { val partitionInfo = cluster.partitionsForTopic(topic) val numPartitions = partitionInfo.size() if (key.isInstanceOf[Int]) { abs(key.toString().toInt) % numPartitions } key.hashCode() % numPartitions } override def close() = { } override def configure(configs: util.Map[String, _]) = { } }