既然Kafka使用Scala写的,最近也在慢慢学习Scala的语法,虽然还比较生疏,但是还是想尝试下用Scala实现Producer和Consumer,并且用HashPartitioner实现消息根据key路由到指定的partition。
Producer:
import java.util.Properties import kafka.producer.ProducerConfig import kafka.producer.Producer import kafka.producer.KeyedMessage object ProducerDemo { def main(args: Array[String]): Unit = { val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092" val topic = "ScalaTopic"; val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", classOf[HashPartitioner].getName) props.put("producer.type", "sync") props.put("batch.num.messages", "1") props.put("queue.buffering.max.messages", "1000000") props.put("queue.enqueue.timeout.ms", "20000000") val config = new ProducerConfig(props) val producer = new Producer[String, String](config); val sleepFlag = false; val message1 = new KeyedMessage[String, String](topic, "1", "test 0"); producer.send(message1); if(sleepFlag) Thread.sleep(5000); val message2 = new KeyedMessage[String, String](topic, "1", "test 1"); producer.send(message2); if(sleepFlag) Thread.sleep(5000); val message3 = new KeyedMessage[String, String](topic, "1", "test 2"); producer.send(message3); if(sleepFlag) Thread.sleep(5000); val message4 = new KeyedMessage[String, String](topic, "4", "test 3"); producer.send(message4); if(sleepFlag) Thread.sleep(5000); val message5 = new KeyedMessage[String, String](topic, "4", "test 4"); producer.send(message5); if(sleepFlag) Thread.sleep(5000); val message6 = new KeyedMessage[String, String](topic, "4", "test 4"); producer.send(message6); if(sleepFlag) Thread.sleep(5000); } }
Consumer:
import java.util.Properties import kafka.consumer.ConsumerConfig import kafka.consumer.Consumer import kafka.message.MessageAndMetadata object ConsumerDemo { def main(args: Array[String]): Unit = { var groupid = "" var consumerid = "" var topic = "" args match { case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3 } val props = new Properties() props.put("zookeeper.connect", "192.168.1.151:2181,192.168.1.152:2181,192.168.1.153:2181") props.put("group.id", groupid) props.put("client.id", "test") props.put("consumer.id", consumerid) props.put("auto.offset.reset", "smallest") props.put("auto.commit.enable", "true") props.put("auto.commit.interval.ms", "100") val consumerConfig = new ConsumerConfig(props) val consumer = Consumer.create(consumerConfig) val topicCountMap = Map(topic -> 1) val consumerMap = consumer.createMessageStreams(topicCountMap) val streams = consumerMap.get(topic).get for (stream <- streams) { val it = stream.iterator() while (it.hasNext()) { val messageAndMetadata = it.next() val message = s"Topic:${messageAndMetadata.topic}, GroupID:$groupid, Consumer ID:$consumerid, PartitionID:${messageAndMetadata.partition}, " + s"Offset:${messageAndMetadata.offset}, Message Key:${new String(messageAndMetadata.key())}, Message Payload: ${new String(messageAndMetadata.message())}" System.out.println(message); } } } }
HashPartitioner:
import kafka.producer.Partitioner import scala.math._ import kafka.utils.VerifiableProperties class HashPartitioner extends Partitioner { def this(verifiableProperties: VerifiableProperties) { this } override def partition(key: Any, numPartitions: Int): Int = { if (key.isInstanceOf[Int]) { abs(key.toString().toInt) % numPartitions } key.hashCode() % numPartitions } }
运行结果:
所有消息都被路由到了Partition1,测试成功!