RoundRobinPartitioner/HashPartitioner:
import java.util
import java.util.concurrent.atomic.AtomicLong
import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster
class SelfRoundRobinPartitioner extends Partitioner {
val next = new AtomicLong();
override def partition(topic: String, key: scala.Any, keyBytes: Array[Byte], value: scala.Any, valueBytes: Array[Byte], cluster: Cluster) = {
val partitionInfo = cluster.partitionsForTopic(topic)
val numPartitions = partitionInfo.size()
val nextIndex = next.incrementAndGet()
val partionNum: Long = nextIndex % numPartitions
partionNum.toInt
}
override def close() = {
}
override def configure(configs: util.Map[String, _]) = {
}
}
import java.util
import scala.math._
import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster
class SelfHashPartitioner extends Partitioner {
override def partition(topic: String, key: scala.Any, keyBytes: Array[Byte], value: scala.Any, valueBytes: Array[Byte], cluster: Cluster) = {
val partitionInfo = cluster.partitionsForTopic(topic)
val numPartitions = partitionInfo.size()
if (key.isInstanceOf[Int]) {
abs(key.toString().toInt) % numPartitions
}
key.hashCode() % numPartitions
}
override def close() = {
}
override def configure(configs: util.Map[String, _]) = {
}
}
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducer {
def main(args: Array[String]): Unit = {
val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092"
// val brokers = "192.168.1.151:9092"
val topic = "ScalaTopic";
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// props.put("partitioner.class", classOf[SelfHashPartitioner].getName)
props.put("partitioner.class", classOf[SelfRoundRobinPartitioner].getName)
props.put("producer.type", "sync")
props.put("batch.size", "1")
props.put("acks", "all")
val producer = new KafkaProducer[String, String](props);
val sleepFlag = false;
val message1 = new ProducerRecord[String, String](topic, "1", "test 1aa");
producer.send(message1);
if (sleepFlag) Thread.sleep(5000);
val message2 = new ProducerRecord[String, String](topic, "1", "test 1bb");
producer.send(message2);
if (sleepFlag) Thread.sleep(5000);
val message3 = new ProducerRecord[String, String](topic, "1", "test 1cc");
producer.send(message3);
if (sleepFlag) Thread.sleep(5000);
val message4 = new ProducerRecord[String, String](topic, "4", "test 4dd");
producer.send(message4);
if (sleepFlag) Thread.sleep(5000);
val message5 = new ProducerRecord[String, String](topic, "4", "test 4aa");
producer.send(message5);
if (sleepFlag) Thread.sleep(5000);
val message6 = new ProducerRecord[String, String](topic, "3", "test 3bb");
producer.send(message6);
if (sleepFlag) Thread.sleep(5000);
val message7 = new ProducerRecord[String, String](topic, "2", "test 2bb");
producer.send(message7);
if (sleepFlag) Thread.sleep(5000);
producer.close()
}
}
import java.lang
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import scala.collection.JavaConversions._
object KafkaTConsumer {
def main(args: Array[String]): Unit = {
var groupid = "ScalaGroup"
var consumerid = "ScalaConsumer"
var topic = "ScalaTopic"
//args match {
// case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3
//}
val props = new Properties()
props.put("bootstrap.servers", "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092")
props.put("group.id", groupid)
props.put("client.id", "test")
props.put("consumer.id", consumerid)
// props.put("auto.offset.reset", "smallest")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "100")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(java.util.Arrays.asList(topic))
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(100)
for (record <- records) {
println(s"Topic = ${record.topic()}, partition = ${record.partition()}, key = ${record.key()}, value = ${record.value()}")
}
}
}
}
Roud robin运行结果:
Topic = ScalaTopic, partition = 0, key = 1, value = test 1cc
Topic = ScalaTopic, partition = 0, key = 3, value = test 3bb
Topic = ScalaTopic, partition = 1, key = 1, value = test 1aa
Topic = ScalaTopic, partition = 1, key = 4, value = test 4dd
Topic = ScalaTopic, partition = 2, key = 1, value = test 1bb
Topic = ScalaTopic, partition = 2, key = 4, value = test 4aa
Topic = ScalaTopic, partition = 1, key = 2, value = test 2bb
Hash 运行结果:
Topic = ScalaTopic, partition = 1, key = 1, value = test 1aa
Topic = ScalaTopic, partition = 1, key = 1, value = test 1bb
Topic = ScalaTopic, partition = 0, key = 3, value = test 3bb
Topic = ScalaTopic, partition = 2, key = 2, value = test 2bb
Topic = ScalaTopic, partition = 1, key = 1, value = test 1cc
Topic = ScalaTopic, partition = 1, key = 4, value = test 4dd
Topic = ScalaTopic, partition = 1, key = 4, value = test 4aa